bbda05cb5571c5c8718e28708c6456350a9a87d7
[spider.git] / perl / DXMsg.pm
1 #!/usr/bin/perl
2 #
3 # This module impliments the message handling for a dx cluster
4 #
5 # Copyright (c) 1998 Dirk Koopman G1TLH
6 #
7 # $Id$
8
9
10 package DXMsg;
11
12 @ISA = qw(DXProt DXChannel);
13
14 use DXUtil;
15 use DXChannel;
16 use DXUser;
17 use DXM;
18 use DXCluster;
19 use DXProtVars;
20 use DXProtout;
21 use DXDebug;
22 use DXLog;
23 use FileHandle;
24 use Carp;
25
26 use strict;
27 use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean);
28
29 %work = ();                # outstanding jobs
30 @msg = ();                 # messages we have
31 %busy = ();                # station interlocks
32 $msgdir = "$main::root/msg";              # directory contain the msgs
33 $maxage = 30 * 86400;      # the maximum age that a message shall live for if not marked 
34 $last_clean = 0;           # last time we did a clean
35
36 %valid = (
37   fromnode => '9,From Node',
38   tonode => '9,To Node',
39   to => '0,To',
40   from => '0,From',
41   t => '0,Msg Time,cldatetime',
42   private => '9,Private',
43   subject => '0,Subject',
44   linesreq => '0,Lines per Gob',
45   rrreq => '9,Read Confirm',
46   origin => '0,Origin',
47   lines => '5,Data',
48   stream => '9,Stream No',
49   count => '9,Gob Linecnt',
50   file => '9,File?,yesno',
51   gotit => '9,Got it Nodes,parray',
52   lines => '9,Lines,parray',
53   read => '9,Times read',
54   size => '0,Size',
55   msgno => '0,Msgno',
56   keep => '0,Keep this?,yesno',
57 );
58
59 # allocate a new object
60 # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper  
61 sub alloc                  
62 {
63   my $pkg = shift;
64   my $self = bless {}, $pkg;
65   $self->{msgno} = shift;
66   $self->{to} = shift;
67   $self->{from} = shift;
68   $self->{t} = shift;
69   $self->{private} = shift;
70   $self->{subject} = shift;
71   $self->{origin} = shift;
72   $self->{read} = shift;
73   $self->{rrreq} = shift;
74   $self->{gotit} = [];
75     
76   return $self;
77 }
78
79 sub workclean
80 {
81   my $ref = shift;
82   delete $ref->{lines};
83   delete $ref->{linesreq};
84   delete $ref->{tonode};
85   delete $ref->{fromnode};
86   delete $ref->{stream};
87   delete $ref->{lines};
88   delete $ref->{file};
89   delete $ref->{count};
90 }
91
92 sub process
93 {
94   my ($self, $line) = @_;
95   my @f = split /[\^\~]/, $line;
96   my ($pcno) = $f[0] =~ /^PC(\d\d)/;          # just get the number
97   
98   SWITCH: {
99     if ($pcno == 28) {                        # incoming message
100           my $t = cltounix($f[5], $f[6]);
101           my $stream = next_transno($f[2]);
102           my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
103           
104           # fill in various forwarding state variables
105       $ref->{fromnode} = $f[2];
106       $ref->{tonode} = $f[1];
107           $ref->{rrreq} = $f[11];
108           $ref->{linesreq} = $f[10];
109           $ref->{stream} = $stream;
110           $ref->{count} = 0;                      # no of lines between PC31s
111           dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
112       $work{"$f[2]$stream"} = $ref;         # store in work
113           $busy{$f[2]} = $ref;                          # set interlock
114           $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
115           last SWITCH;
116         }
117         
118     if ($pcno == 29) {                        # incoming text
119           my $ref = $work{"$f[2]$f[3]"};
120           if ($ref) {
121             push @{$ref->{lines}}, $f[4];
122                 $ref->{count}++;
123                 if ($ref->{count} >= $ref->{linesreq}) {
124                   $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
125                   dbg('msg', "stream $f[3]: $ref->{linereq} lines received\n");
126                   $ref->{count} = 0;
127                 }
128           }
129           last SWITCH;
130         }
131         
132     if ($pcno == 30) {                        # this is a incoming subject ack
133           my $ref = $work{$f[2]};          # note no stream at this stage
134           delete $work{$f[2]};
135           $ref->{stream} = $f[3];
136           $ref->{count} = 0;
137           $ref->{linesreq} = 5;
138           $work{"$f[2]$f[3]"} = $ref;        # new ref
139           dbg('msg', "incoming subject ack stream $f[3]\n");
140           $busy{$f[2]} = $ref;                       # interlock
141           $ref->{lines} = [];
142           push @{$ref->{lines}}, ($ref->read_msg_body);
143           $ref->send_tranche($self);
144           last SWITCH;
145         }
146         
147     if ($pcno == 31) {                        # acknowledge a tranche of lines
148           my $ref = $work{"$f[2]$f[3]"};
149           if ($ref) {
150             dbg('msg', "tranche ack stream $f[3]\n");
151             $ref->send_tranche($self);
152           } else {
153             $self->send(DXProt::pc42($f[2], $f[1], $f[3]));       # unknown stream
154           } 
155           last SWITCH;
156         }
157         
158     if ($pcno == 32) {                         # incoming EOM
159           dbg('msg', "stream $f[3]: EOM received\n");
160           my $ref = $work{"$f[2]$f[3]"};
161           if ($ref) {
162             $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it
163                 
164                 # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
165                 # store the file or message
166                 # remove extraneous rubbish from the hash
167                 # remove it from the work in progress vector
168                 # stuff it on the msg queue
169                 if ($ref->{lines} && @{$ref->{lines}} > 0) {            # ignore messages with 0 lines
170                   $ref->{msgno} = next_transno("Msgno") if !$ref->{file};
171               push @{$ref->{gotit}}, $f[2];           # mark this up as being received
172                   $ref->store($ref->{lines});
173                   add_dir($ref);
174                   my $dxchan = DXChannel->get($ref->{to});
175                   $dxchan->send("New mail has arrived for you") if $dxchan;
176                   Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
177                 }
178                 $ref->stop_msg($self);
179                 queue_msg();
180           } else {
181             $self->send(DXProt::pc42($f[2], $f[1], $f[3]));       # unknown stream
182           }
183           queue_msg();
184           last SWITCH;
185         }
186         
187     if ($pcno == 33) {                         # acknowledge the end of message
188           my $ref = $work{"$f[2]$f[3]"};
189           if ($ref) {
190                 if ($ref->{private}) {                   # remove it if it private and gone off site#
191                         Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2] and deleted");
192                         $ref->del_msg;
193             } else {
194                         Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2]");
195                         push @{$ref->{gotit}}, $f[2];           # mark this up as being received
196                         $ref->store($ref->{lines});             # re- store the file
197             }
198         $ref->stop_msg($self);
199           } else {
200             $self->send(DXProt::pc42($f[2], $f[1], $f[3]));       # unknown stream
201           } 
202           queue_msg();
203           last SWITCH;
204         }
205         
206         if ($pcno == 40) {                         # this is a file request
207           $f[3] =~ s/\\/\//og;                     # change the slashes
208           $f[3] =~ s/\.//og;                       # remove dots
209           $f[3] = lc $f[3];                        # to lower case;
210           dbg('msg', "incoming file $f[3]\n");
211           last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|msg)\//;    # prevent access to executables
212           
213           # create any directories
214           my @part = split /\//, $f[3];
215           my $part;
216           my $fn = "$main::root";
217           pop @part;         # remove last part
218           foreach $part (@part) {
219             $fn .= "/$part";
220                 next if -e $fn;
221             last SWITCH if !mkdir $fn, 0777;
222         dbg('msg', "created directory $fn\n");
223           }
224           my $stream = next_transno($f[2]);
225           my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0');
226           
227           # forwarding variables
228       $ref->{fromnode} = $f[1];
229       $ref->{tonode} = $f[2];
230           $ref->{linesreq} = $f[5];
231           $ref->{stream} = $stream;
232           $ref->{count} = 0;                      # no of lines between PC31s
233           $ref->{file} = 1;
234       $work{"$f[2]$stream"} = $ref;         # store in work
235           $self->send(DXProt::pc30($f[2], $f[1], $stream));  # send ack 
236           
237           last SWITCH;
238         }
239         
240         if ($pcno == 42) {                        # abort transfer
241           dbg('msg', "stream $f[3]: abort received\n");
242           my $ref = $work{"$f[2]$f[3]"};
243           if ($ref) {
244             $ref->stop_msg($self);
245                 $ref = undef;
246           }
247           
248           last SWITCH;
249         }
250   }
251
252   clean_old() if $main::systime - $last_clean > 3600 ;    # clean the message queue
253 }
254
255
256 # store a message away on disc or whatever
257 #
258 # NOTE the second arg is a REFERENCE not a list
259 sub store
260 {
261   my $ref = shift;
262   my $lines = shift;
263   
264   # we only proceed if there are actually any lines in the file
265   if (!$lines || @{$lines} == 0) {
266         return;
267   }
268   
269   if ($ref->{file}) {   # a file
270     dbg('msg', "To be stored in $ref->{to}\n");
271   
272     my $fh = new FileHandle "$ref->{to}", "w";
273         if (defined $fh) {
274           my $line;
275           foreach $line (@{$lines}) {
276                 print $fh "$line\n";
277           }
278           $fh->close;
279           dbg('msg', "file $ref->{to} stored\n");
280           Log('msg', "file $ref->{to} from $ref->{from} stored" );
281     } else {
282       confess "can't open file $ref->{to} $!";  
283     }
284 #       push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode};
285   } else {              # a normal message
286
287     # attempt to open the message file
288         my $fn = filename($ref->{msgno});
289
290     dbg('msg', "To be stored in $fn\n");
291     
292         # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem)
293     my $fh = new FileHandle "$fn", "w";
294         if (defined $fh) {
295           my $rr = $ref->{rrreq} ? '1' : '0';
296           my $priv = $ref->{private} ? '1': '0';
297       print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{read}^$rr\n";
298           print $fh "=== ", join('^', @{$ref->{gotit}}), "\n";
299           my $line;
300           $ref->{size} = 0;
301           foreach $line (@{$lines}) {
302         $ref->{size} += (length $line) + 1;
303                 print $fh "$line\n";
304           }
305           $fh->close;
306           dbg('msg', "msg $ref->{msgno} stored\n");
307           Log('msg', "msg $ref->{msgno} from $ref->{from} to $ref->{to} stored" );
308     } else {
309       confess "can't open msg file $fn $!";  
310     }
311   }
312 }
313
314 # delete a message
315 sub del_msg
316 {
317   my $self = shift;
318
319   # remove it from the active message list
320   @msg = map { $_ != $self ? $_ : () } @msg;
321   
322   # belt and braces (one day I will ask someone if this is REALLY necessary)
323   delete $self->{gotit};
324   delete $self->{list};
325   
326   # remove the file
327   unlink filename($self->{msgno});
328   dbg('msg', "deleting $self->{msgno}\n");
329 }
330
331 # clean out old messages from the message queue
332 sub clean_old
333 {
334         my $ref;
335         
336         # mark old messages for deletion
337         foreach $ref (@msg) {
338                 if (!$ref->{keep} && $ref->{t} < $main::systime - $maxage) {
339                         $ref->{deleteme} = 1;
340                         delete $ref->{gotit};
341                         delete $ref->{list};
342                         unlink filename($ref->{msgno});
343                         dbg('msg', "deleting old $ref->{msgno}\n");
344                 }
345         }
346
347         # remove them all from the active message list
348         @msg = map { $_->{deleteme} ? () : $_ } @msg;
349         $last_clean = $main::systime;
350 }
351
352 # read in a message header
353 sub read_msg_header
354
355   my $fn = shift;
356   my $file;
357   my $line;
358   my $ref;
359   my @f;
360   my $size;
361
362   $file = new FileHandle;
363   if (!open($file, $fn)) {
364     print "Error reading $fn $!\n";
365     return undef;
366   }
367   $size = -s $fn;
368   $line = <$file>;       # first line
369   chomp $line;
370   $size -= length $line;
371   if (! $line =~ /^===/o) {
372     print "corrupt first line in $fn ($line)\n";
373     return undef;
374   }
375   $line =~ s/^=== //o;
376   @f = split /\^/, $line;
377   $ref = DXMsg->alloc(@f);
378   
379   $line = <$file>;       # second line
380   chomp $line;
381   $size -= length $line;
382   if (! $line =~ /^===/o) {
383     print "corrupt second line in $fn ($line)\n";
384     return undef;
385   }
386   $line =~ s/^=== //o;
387   $ref->{gotit} = [];
388   @f = split /\^/, $line;
389   push @{$ref->{gotit}}, @f;
390   $ref->{size} = $size;
391  
392   close($file);
393   
394   return $ref;
395 }
396
397 # read in a message header
398 sub read_msg_body
399 {
400   my $self = shift;
401   my $msgno = $self->{msgno};
402   my $file;
403   my $line;
404   my $fn = filename($msgno);
405   my @out;
406
407   $file = new FileHandle;
408   if (!open($file, $fn)) {
409     print "Error reading $fn $!\n";
410     return undef;
411   }
412   chomp (@out = <$file>);
413   close($file);
414   
415   shift @out if $out[0] =~ /^=== /;
416   shift @out if $out[0] =~ /^=== /;
417   return @out;
418 }
419
420 # send a tranche of lines to the other end
421 sub send_tranche
422 {
423   my ($self, $dxchan) = @_;
424   my @out;
425   my $to = $self->{tonode};
426   my $from = $self->{fromnode};
427   my $stream = $self->{stream};
428   my $i;
429   
430   for ($i = 0; $i < $self->{linesreq} && $self->{count} < @{$self->{lines}}; $i++, $self->{count}++) {
431     push @out, DXProt::pc29($to, $from, $stream, ${$self->{lines}}[$self->{count}]);
432   }
433   push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq};
434   $dxchan->send(@out);
435 }
436
437
438 # find a message to send out and start the ball rolling
439 sub queue_msg
440 {
441   my $sort = shift;
442   my @nodelist = DXProt::get_all_ak1a();
443   my $ref;
444   my $clref;
445   my $dxchan;
446   
447   # bat down the message list looking for one that needs to go off site and whose
448   # nearest node is not busy.
449
450   dbg('msg', "queue msg ($sort)\n");
451   foreach $ref (@msg) {
452     # firstly, is it private and unread? if so can I find the recipient
453         # in my cluster node list offsite?
454         if ($ref->{private}) {
455           if ($ref->{read} == 0) {
456             $clref = DXCluster->get($ref->{to});
457                 if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) {
458                   $dxchan = $clref->{dxchan};
459                   $ref->start_msg($dxchan) if $clref && !get_busy($dxchan->call);
460                 }
461           }
462         } elsif ($sort == undef) {
463       # otherwise we are dealing with a bulletin, compare the gotit list with
464           # the nodelist up above, if there are sites that haven't got it yet
465           # then start sending it - what happens when we get loops is anyone's
466           # guess, use (to, from, time, subject) tuple?
467           my $noderef;
468           foreach $noderef (@nodelist) {
469             next if $noderef->call eq $main::mycall;
470                 next if grep { $_ eq $noderef->call } @{$ref->{gotit}};
471                 
472                 # if we are here we have a node that doesn't have this message
473                 $ref->start_msg($noderef) if !get_busy($noderef->call);
474                 last;
475           } 
476         }
477         
478         # if all the available nodes are busy then stop
479         last if @nodelist == scalar grep { get_busy($_->call) } @nodelist;
480   }
481 }
482
483 # start the message off on its travels with a PC28
484 sub start_msg
485 {
486   my ($self, $dxchan) = @_;
487
488   dbg('msg', "start msg $self->{msgno}\n");
489   $self->{linesreq} = 5;
490   $self->{count} = 0;
491   $self->{tonode} = $dxchan->call;
492   $self->{fromnode} = $main::mycall;
493   $busy{$dxchan->call} = $self;
494   $work{"$self->{tonode}"} = $self;
495   $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq}));
496 }
497
498 # get the ref of a busy node
499 sub get_busy
500 {
501   my $call = shift;
502   return $busy{$call};
503 }
504
505 # get the busy queue
506 sub get_all_busy
507 {
508   return values %busy;
509 }
510
511 # get the forwarding queue
512 sub get_fwq
513 {
514   return values %work;
515 }
516
517 # stop a message from continuing, clean it out, unlock interlocks etc
518 sub stop_msg
519 {
520   my ($self, $dxchan) = @_;
521   my $node = $dxchan->call;
522
523   dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n");
524   delete $work{$node};
525   delete $work{"$node$self->{stream}"};
526   $self->workclean;
527   delete $busy{$node};
528 }
529
530 # get a new transaction number from the file specified
531 sub next_transno
532 {
533   my $name = shift;
534   $name =~ s/\W//og;      # remove non-word characters
535   my $fn = "$msgdir/$name";
536   my $msgno;
537   
538   my $fh = new FileHandle;
539   if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) {
540     $fh->autoflush(1);
541         $msgno = $fh->getline;
542         chomp $msgno;
543         $msgno++;
544         seek $fh, 0, 0;
545         $fh->print("$msgno\n");
546         dbg('msg', "msgno $msgno allocated for $name\n");
547         $fh->close;
548   } else {
549     confess "can't open $fn $!";
550   }
551   return $msgno;
552 }
553
554 # initialise the message 'system', read in all the message headers
555 sub init
556 {
557   my $dir = new FileHandle;
558   my @dir;
559   my $ref;
560
561   # read in the directory
562   opendir($dir, $msgdir) or confess "can't open $msgdir $!";
563   @dir = readdir($dir);
564   closedir($dir);
565   
566   for (sort @dir) {
567     next if /^\./o;
568         next if ! /^m\d+/o;
569
570     $ref = read_msg_header("$msgdir/$_");
571         next if !$ref;
572         
573         # add the message to the available queue
574         add_dir($ref); 
575         
576   }
577 }
578
579 # add the message to the directory listing
580 sub add_dir
581 {
582   my $ref = shift;
583   confess "tried to add a non-ref to the msg directory" if !ref $ref;
584   push @msg, $ref;
585 }
586
587 # return all the current messages
588 sub get_all
589 {
590   return @msg;
591 }
592
593 # get a particular message
594 sub get
595 {
596   my $msgno = shift;
597   for (@msg) {
598     return $_ if $_->{msgno} == $msgno;
599     last if $_->{msgno} > $msgno;
600   }
601   return undef;
602 }
603
604 # return the official filename for a message no
605 sub filename
606 {
607   return sprintf "$msgdir/m%06d", shift;
608 }
609
610 #
611 # return a list of valid elements 
612
613
614 sub fields
615 {
616   return keys(%valid);
617 }
618
619 #
620 # return a prompt for a field
621 #
622
623 sub field_prompt
624
625   my ($self, $ele) = @_;
626   return $valid{$ele};
627 }
628
629 #
630 # send a message state machine
631 sub do_send_stuff
632 {
633   my $self = shift;
634   my $line = shift;
635   my @out;
636   
637   if ($self->state eq 'send1') {
638 #  $DB::single = 1;
639     confess "local var gone missing" if !ref $self->{loc};
640         my $loc = $self->{loc};
641         $loc->{subject} = $line;
642         $loc->{lines} = [];
643         $self->state('sendbody');
644         #push @out, $self->msg('sendbody');
645         push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit";
646   } elsif ($self->state eq 'sendbody') {
647     confess "local var gone missing" if !ref $self->{loc};
648         my $loc = $self->{loc};
649         if ($line eq "\032" || uc $line eq "/EX") {
650       my $to;
651
652       if (@{$loc->{lines}} > 0) {
653             foreach $to (@{$loc->{to}}) {
654               my $ref;
655                   my $systime = $main::systime;
656                   my $mycall = $main::mycall;
657                   $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'),
658                                 uc $to,
659                                                         $self->call, 
660                                                         $systime,
661                                                         $loc->{private}, 
662                                                         $loc->{subject}, 
663                                                         $mycall,
664                                                         '0',
665                                                         $loc->{rrreq});
666               $ref->store($loc->{lines});
667                   $ref->add_dir();
668                   #push @out, $self->msg('sendsent', $to);
669                   push @out, "msgno $ref->{msgno} sent to $to";
670                   my $dxchan = DXChannel->get(uc $to);
671                   $dxchan->send("New mail has arrived for you") if $dxchan;
672             }
673           }
674           delete $loc->{lines};
675           delete $loc->{to};
676           delete $self->{loc};
677           $self->state('prompt');
678           $self->func(undef);
679           DXMsg::queue_msg();
680     } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
681       #push @out, $self->msg('sendabort');
682           push @out, "aborted";
683           delete $loc->{lines};
684           delete $loc->{to};
685           delete $self->{loc};
686           $self->func(undef);
687           $self->state('prompt');
688     } else {
689   
690       # i.e. it ain't and end or abort, therefore store the line
691       push @{$loc->{lines}}, $line;
692     }
693   }
694   return (1, @out);
695 }
696
697 # return the standard directory line for this ref 
698 sub dir
699 {
700   my $ref = shift;
701   return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s", 
702     $ref->msgno, $ref->read ? '-' : ' ', $ref->private ? 'p' : ' ', $ref->size,
703         $ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject;
704 }
705
706 no strict;
707 sub AUTOLOAD
708 {
709   my $self = shift;
710   my $name = $AUTOLOAD;
711   return if $name =~ /::DESTROY$/;
712   $name =~ s/.*:://o;
713   
714   confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
715   @_ ? $self->{$name} = shift : $self->{$name} ;
716 }
717
718 1;
719
720 __END__