some detail changes
[spider.git] / perl / DXMsg.pm
1 #!/usr/bin/perl
2 #
3 # This module impliments the message handling for a dx cluster
4 #
5 # Copyright (c) 1998 Dirk Koopman G1TLH
6 #
7 # $Id$
8
9
10 package DXMsg;
11
12 @ISA = qw(DXProt DXChannel);
13
14 use DXUtil;
15 use DXChannel;
16 use DXUser;
17 use DXM;
18 use DXCluster;
19 use DXProtVars;
20 use DXProtout;
21 use DXDebug;
22 use FileHandle;
23 use Carp;
24
25 use strict;
26 use vars qw(%work @msg $msgdir %valid);
27
28 %work = ();                # outstanding jobs
29 @msg = ();                 # messages we have
30 $msgdir = "$main::root/msg";              # directory contain the msgs
31
32 %valid = (
33   fromnode => '9,From Node',
34   tonode => '9,To Node',
35   to => '0,To',
36   from => '0,From',
37   t => '0,Msg Time,cldatetime',
38   private => '9,Private',
39   subject => '0,Subject',
40   linesreq => '0,Lines per Gob',
41   rrreq => '9,Read Confirm',
42   origin => '0,Origin',
43   lines => '5,Data',
44   stream => '9,Stream No',
45   count => '9,Gob Linecnt',
46   file => '9,File?,yesno',
47   gotit => '9,Got it Nodes,parray',
48   lines => '9,Lines,parray',
49   read => '9,Times read',
50   size => '0,Size',
51   msgno => '0,Msgno',
52 );
53
54 # allocate a new object
55 # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper  
56 sub alloc                  
57 {
58   my $pkg = shift;
59   my $self = bless {}, $pkg;
60   $self->{msgno} = shift;
61   $self->{to} = shift;
62   $self->{from} = shift;
63   $self->{t} = shift;
64   $self->{private} = shift;
65   $self->{subject} = shift;
66   $self->{origin} = shift;
67   $self->{read} = shift;
68     
69   return $self;
70 }
71
72 sub workclean
73 {
74   my $ref = shift;
75   delete $ref->{lines};
76   delete $ref->{linesreq};
77   delete $ref->{tonode};
78   delete $ref->{fromnode};
79   delete $ref->{stream};
80   delete $ref->{lines};
81   delete $ref->{file};
82   delete $ref->{count};
83 }
84
85 sub process
86 {
87   my ($self, $line) = @_;
88   my @f = split /[\^\~]/, $line;
89   my ($pcno) = $f[0] =~ /^PC(\d\d)/;          # just get the number
90   
91   SWITCH: {
92     if ($pcno == 28) {                        # incoming message
93           my $t = cltounix($f[5], $f[6]);
94           my $stream = next_transno($f[2]);
95           my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0');
96           
97           # fill in various forwarding state variables
98       $ref->{fromnode} = $f[2];
99       $ref->{tonode} = $f[1];
100           $ref->{rrreq} = $f[11];
101           $ref->{linesreq} = $f[10];
102           $ref->{stream} = $stream;
103           $ref->{count} = 0;                      # no of lines between PC31s
104           dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
105       $work{"$f[1]$f[2]$stream"} = $ref;         # store in work
106           $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
107           last SWITCH;
108         }
109         
110     if ($pcno == 29) {                        # incoming text
111           my $ref = $work{"$f[1]$f[2]$f[3]"};
112           if ($ref) {
113             push @{$ref->{lines}}, $f[4];
114                 $ref->{count}++;
115                 if ($ref->{count} >= $ref->{linesreq}) {
116                   $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
117                   dbg('msg', "stream $f[3]: $ref->{linereq} lines received\n");
118                   $ref->{count} = 0;
119                 }
120           }
121           last SWITCH;
122         }
123         
124     if ($pcno == 30) {
125           last SWITCH;
126         }
127         
128     if ($pcno == 31) {
129           last SWITCH;
130         }
131         
132     if ($pcno == 32) {                         # incoming EOM
133           dbg('msg', "stream $f[3]: EOM received\n");
134           my $ref = $work{"$f[1]$f[2]$f[3]"};
135           if ($ref) {
136             $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it
137                 
138                 # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
139                 # store the file or message
140                 # remove extraneous rubbish from the hash
141                 # remove it from the work in progress vector
142                 # stuff it on the msg queue
143                 $ref->{msgno} = next_transno("Msgno") if !$ref->{file};
144                 $ref->store($ref->{lines});             
145                 $ref->workclean;
146                 delete $work{"$f[1]$f[2]$f[3]"};       
147                 push @msg, $ref;           
148           }
149           last SWITCH;
150         }
151         
152     if ($pcno == 33) {
153           last SWITCH;
154         }
155         
156         if ($pcno == 40) {                         # this is a file request
157           $f[3] =~ s/\\/\//og;                     # change the slashes
158           $f[3] =~ s/\.//og;                       # remove dots
159           $f[3] = lc $f[3];                        # to lower case;
160           dbg('msg', "incoming file $f[3]\n");
161           last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|msg)\//;    # prevent access to executables
162           
163           # create any directories
164           my @part = split /\//, $f[3];
165           my $part;
166           my $fn = "$main::root";
167           pop @part;         # remove last part
168           foreach $part (@part) {
169             $fn .= "/$part";
170                 next if -e $fn;
171             last SWITCH if !mkdir $fn, 0777;
172         dbg('msg', "created directory $fn\n");
173           }
174           my $stream = next_transno($f[2]);
175           my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0');
176           
177           # forwarding variables
178       $ref->{fromnode} = $f[1];
179       $ref->{tonode} = $f[2];
180           $ref->{linesreq} = $f[5];
181           $ref->{stream} = $stream;
182           $ref->{count} = 0;                      # no of lines between PC31s
183           $ref->{file} = 1;
184       $work{"$f[1]$f[2]$stream"} = $ref;         # store in work
185           $self->send(DXProt::pc30($f[2], $f[1], $stream));  # send ack 
186           
187           last SWITCH;
188         }
189   }
190 }
191
192
193 # store a message away on disc or whatever
194 sub store
195 {
196   my $ref = shift;
197   my $lines = shift;
198   
199   # we only proceed if there are actually any lines in the file
200   if (@{$lines} == 0) {
201         return;
202   }
203   
204   if ($ref->{file}) {   # a file
205     dbg('msg', "To be stored in $ref->{to}\n");
206   
207     my $fh = new FileHandle "$ref->{to}", "w";
208         if (defined $fh) {
209           my $line;
210           foreach $line (@{$lines}) {
211                 print $fh "$line\n";
212           }
213           $fh->close;
214           dbg('msg', "file $ref->{to} stored\n");
215     } else {
216       confess "can't open file $ref->{to} $!";  
217     }
218 #       push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode};
219   } else {              # a normal message
220
221     # attempt to open the message file
222         my $fn = filename($ref->{msgno});
223
224     dbg('msg', "To be stored in $fn\n");
225   
226     my $fh = new FileHandle "$fn", "w";
227         if (defined $fh) {
228       print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$ref->{private}^$ref->{subject}^$ref->{origin}^$ref->{read}\n";
229           print $fh "=== $ref->{fromnode}\n";
230           my $line;
231           $ref->{size} = 0;
232           foreach $line (@{$lines}) {
233         $ref->{size} += (length $line) + 1;
234                 print $fh "$line\n";
235           }
236           $ref->{gotit} = [];
237           push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode};
238           $fh->close;
239           dbg('msg', "msg $ref->{msgno} stored\n");
240     } else {
241       confess "can't open msg file $fn $!";  
242     }
243   }
244 }
245
246 # delete a message
247 sub del_msg
248 {
249   my $self = shift;
250
251   # remove it from the active message list
252   @msg = map { $_ != $self ? $_ : () } @msg;
253   
254   # remove the file
255   unlink filename($self->{msgno});
256 }
257
258 # read in a message header
259 sub read_msg_header
260
261   my $fn = shift;
262   my $file;
263   my $line;
264   my $ref;
265   my @f;
266   my $size;
267
268   $file = new FileHandle;
269   if (!open($file, $fn)) {
270     print "Error reading $fn $!\n";
271     return undef;
272   }
273   $size = -s $fn;
274   $line = <$file>;       # first line
275   chomp $line;
276   $size -= length $line;
277   if (! $line =~ /^===/o) {
278     print "corrupt first line in $fn ($line)\n";
279     return undef;
280   }
281   $line =~ s/^=== //o;
282   @f = split /\^/, $line;
283   $ref = DXMsg->alloc(@f);
284   
285   $line = <$file>;       # second line
286   chomp $line;
287   $size -= length $line;
288   if (! $line =~ /^===/o) {
289     print "corrupt second line in $fn ($line)\n";
290     return undef;
291   }
292   $line =~ s/^=== //o;
293   $ref->{gotit} = [];
294   @f = split /\^/, $line;
295   push @{$ref->{goit}}, @f;
296   $ref->{size} = $size;
297  
298   close($file);
299   
300   return $ref;
301 }
302
303 # read in a message header
304 sub read_msg_body
305 {
306   my $self = shift;
307   my $msgno = $self->{msgno};
308   my $file;
309   my $line;
310   my $fn = filename($msgno);
311   my @out;
312
313   $file = new FileHandle;
314   if (!open($file, $fn)) {
315     print "Error reading $fn $!\n";
316     return undef;
317   }
318   chomp (@out = <$file>);
319   close($file);
320   
321   shift @out if $out[0] =~ /^=== /;
322   shift @out if $out[0] =~ /^=== /;
323   return @out;
324 }
325
326 # get a new transaction number from the file specified
327 sub next_transno
328 {
329   my $name = shift;
330   $name =~ s/\W//og;      # remove non-word characters
331   my $fn = "$msgdir/$name";
332   my $msgno;
333   
334   my $fh = new FileHandle;
335   if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) {
336     $fh->autoflush(1);
337         $msgno = $fh->getline;
338         chomp $msgno;
339         $msgno++;
340         seek $fh, 0, 0;
341         $fh->print("$msgno\n");
342         dbg('msg', "msgno $msgno allocated for $name\n");
343         $fh->close;
344   } else {
345     confess "can't open $fn $!";
346   }
347   return $msgno;
348 }
349
350 # initialise the message 'system', read in all the message headers
351 sub init
352 {
353   my $dir = new FileHandle;
354   my @dir;
355   my $ref;
356
357   # read in the directory
358   opendir($dir, $msgdir) or confess "can't open $msgdir $!";
359   @dir = readdir($dir);
360   closedir($dir);
361   
362   for (sort @dir) {
363     next if /^\./o;
364         next if ! /^m\d+/o;
365
366     $ref = read_msg_header("$msgdir/$_");
367         next if !$ref;
368         
369         # add the clusters that have this
370         push @msg, $ref; 
371         
372   }
373 }
374
375 # return all the current messages
376 sub get_all
377 {
378   return @msg;
379 }
380
381 # get a particular message
382 sub get
383 {
384   my $msgno = shift;
385   for (@msg) {
386     return $_ if $_->{msgno} == $msgno;
387     last if $_->{msgno} > $msgno;
388   }
389   return undef;
390 }
391
392 # return the official filename for a message no
393 sub filename
394 {
395   return sprintf "$msgdir/m%06d", shift;
396 }
397
398 #
399 # return a list of valid elements 
400
401
402 sub fields
403 {
404   return keys(%valid);
405 }
406
407 #
408 # return a prompt for a field
409 #
410
411 sub field_prompt
412
413   my ($self, $ele) = @_;
414   return $valid{$ele};
415 }
416
417 no strict;
418 sub AUTOLOAD
419 {
420   my $self = shift;
421   my $name = $AUTOLOAD;
422   return if $name =~ /::DESTROY$/;
423   $name =~ s/.*:://o;
424   
425   confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
426   @_ ? $self->{$name} = shift : $self->{$name} ;
427 }
428
429
430 1;
431
432 __END__