9316c374444ca0eff24087994b181f17c1fb7978
[spider.git] / perl / DXMsg.pm
1 #!/usr/bin/perl
2 #
3 # This module impliments the message handling for a dx cluster
4 #
5 # Copyright (c) 1998 Dirk Koopman G1TLH
6 #
7 # $Id$
8
9
10 package DXMsg;
11
12 @ISA = qw(DXProt DXChannel);
13
14 use DXUtil;
15 use DXChannel;
16 use DXUser;
17 use DXM;
18 use DXCluster;
19 use DXProtVars;
20 use DXProtout;
21 use DXDebug;
22 use FileHandle;
23 use Carp;
24
25 use strict;
26 use vars qw(%work @msg $msgdir %valid %busy);
27
28 %work = ();                # outstanding jobs
29 @msg = ();                 # messages we have
30 %busy = ();                # station interlocks
31 $msgdir = "$main::root/msg";              # directory contain the msgs
32
33 %valid = (
34   fromnode => '9,From Node',
35   tonode => '9,To Node',
36   to => '0,To',
37   from => '0,From',
38   t => '0,Msg Time,cldatetime',
39   private => '9,Private',
40   subject => '0,Subject',
41   linesreq => '0,Lines per Gob',
42   rrreq => '9,Read Confirm',
43   origin => '0,Origin',
44   lines => '5,Data',
45   stream => '9,Stream No',
46   count => '9,Gob Linecnt',
47   file => '9,File?,yesno',
48   gotit => '9,Got it Nodes,parray',
49   lines => '9,Lines,parray',
50   read => '9,Times read',
51   size => '0,Size',
52   msgno => '0,Msgno',
53 );
54
55 # allocate a new object
56 # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper  
57 sub alloc                  
58 {
59   my $pkg = shift;
60   my $self = bless {}, $pkg;
61   $self->{msgno} = shift;
62   $self->{to} = shift;
63   $self->{from} = shift;
64   $self->{t} = shift;
65   $self->{private} = shift;
66   $self->{subject} = shift;
67   $self->{origin} = shift;
68   $self->{read} = shift;
69   $self->{rrreq} = shift;
70   $self->{gotit} = [];
71     
72   return $self;
73 }
74
75 sub workclean
76 {
77   my $ref = shift;
78   delete $ref->{lines};
79   delete $ref->{linesreq};
80   delete $ref->{tonode};
81   delete $ref->{fromnode};
82   delete $ref->{stream};
83   delete $ref->{lines};
84   delete $ref->{file};
85   delete $ref->{count};
86 }
87
88 sub process
89 {
90   my ($self, $line) = @_;
91   my @f = split /[\^\~]/, $line;
92   my ($pcno) = $f[0] =~ /^PC(\d\d)/;          # just get the number
93   
94   SWITCH: {
95     if ($pcno == 28) {                        # incoming message
96           my $t = cltounix($f[5], $f[6]);
97           my $stream = next_transno($f[2]);
98           my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
99           
100           # fill in various forwarding state variables
101       $ref->{fromnode} = $f[2];
102       $ref->{tonode} = $f[1];
103           $ref->{rrreq} = $f[11];
104           $ref->{linesreq} = $f[10];
105           $ref->{stream} = $stream;
106           $ref->{count} = 0;                      # no of lines between PC31s
107           dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
108       $work{"$f[2]$stream"} = $ref;         # store in work
109           $busy{$f[2]} = $ref;                          # set interlock
110           $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
111           last SWITCH;
112         }
113         
114     if ($pcno == 29) {                        # incoming text
115           my $ref = $work{"$f[2]$f[3]"};
116           if ($ref) {
117             push @{$ref->{lines}}, $f[4];
118                 $ref->{count}++;
119                 if ($ref->{count} >= $ref->{linesreq}) {
120                   $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
121                   dbg('msg', "stream $f[3]: $ref->{linereq} lines received\n");
122                   $ref->{count} = 0;
123                 }
124           }
125           last SWITCH;
126         }
127         
128     if ($pcno == 30) {                        # this is a incoming subject ack
129           my $ref = $work{$f[2]};          # note no stream at this stage
130           delete $work{$f[2]};
131           $ref->{stream} = $f[3];
132           $ref->{count} = 0;
133           $ref->{linesreq} = 5;
134           $work{"$f[2]$f[3]"} = $ref;        # new ref
135           dbg('msg', "incoming subject ack stream $f[3]\n");
136           $busy{$f[2]} = $ref;                       # interlock
137           $ref->{lines} = [];
138           push @{$ref->{lines}}, ($ref->read_msg_body);
139           $ref->send_tranche($self);
140           last SWITCH;
141         }
142         
143     if ($pcno == 31) {                        # acknowledge a tranche of lines
144           my $ref = $work{"$f[2]$f[3]"};
145           if ($ref) {
146             dbg('msg', "tranche ack stream $f[3]\n");
147             $ref->send_tranche($self);
148           } else {
149             $self->send(DXProt::pc42($f[2], $f[1], $f[3]));       # unknown stream
150           } 
151           last SWITCH;
152         }
153         
154     if ($pcno == 32) {                         # incoming EOM
155           dbg('msg', "stream $f[3]: EOM received\n");
156           my $ref = $work{"$f[2]$f[3]"};
157           if ($ref) {
158             $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it
159                 
160                 # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
161                 # store the file or message
162                 # remove extraneous rubbish from the hash
163                 # remove it from the work in progress vector
164                 # stuff it on the msg queue
165                 if ($ref->{lines} && @{$ref->{lines}} > 0) {            # ignore messages with 0 lines
166                   $ref->{msgno} = next_transno("Msgno") if !$ref->{file};
167               push @{$ref->{gotit}}, $f[2];           # mark this up as being received
168                   $ref->store($ref->{lines});
169                   add_dir($ref);
170                   my $dxchan = DXChannel->get($ref->{to});
171                   $dxchan->send("New mail has arrived for you") if $dxchan;
172                 }
173                 $ref->stop_msg($self);
174                 queue_msg();
175           } else {
176             $self->send(DXProt::pc42($f[2], $f[1], $f[3]));       # unknown stream
177           }
178           queue_msg();
179           last SWITCH;
180         }
181         
182     if ($pcno == 33) {                         # acknowledge the end of message
183           my $ref = $work{"$f[2]$f[3]"};
184           if ($ref) {
185                 if ($ref->{private}) {                   # remove it if it private and gone off site#
186               $ref->del_msg;
187             } else {
188               push @{$ref->{gotit}}, $f[2];           # mark this up as being received
189                   $ref->store($ref->{lines});             # re- store the file
190             }
191         $ref->stop_msg($self);
192           } else {
193             $self->send(DXProt::pc42($f[2], $f[1], $f[3]));       # unknown stream
194           } 
195           queue_msg();
196           last SWITCH;
197         }
198         
199         if ($pcno == 40) {                         # this is a file request
200           $f[3] =~ s/\\/\//og;                     # change the slashes
201           $f[3] =~ s/\.//og;                       # remove dots
202           $f[3] = lc $f[3];                        # to lower case;
203           dbg('msg', "incoming file $f[3]\n");
204           last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|msg)\//;    # prevent access to executables
205           
206           # create any directories
207           my @part = split /\//, $f[3];
208           my $part;
209           my $fn = "$main::root";
210           pop @part;         # remove last part
211           foreach $part (@part) {
212             $fn .= "/$part";
213                 next if -e $fn;
214             last SWITCH if !mkdir $fn, 0777;
215         dbg('msg', "created directory $fn\n");
216           }
217           my $stream = next_transno($f[2]);
218           my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0');
219           
220           # forwarding variables
221       $ref->{fromnode} = $f[1];
222       $ref->{tonode} = $f[2];
223           $ref->{linesreq} = $f[5];
224           $ref->{stream} = $stream;
225           $ref->{count} = 0;                      # no of lines between PC31s
226           $ref->{file} = 1;
227       $work{"$f[2]$stream"} = $ref;         # store in work
228           $self->send(DXProt::pc30($f[2], $f[1], $stream));  # send ack 
229           
230           last SWITCH;
231         }
232         
233         if ($pcno == 42) {                        # abort transfer
234           dbg('msg', "stream $f[3]: abort received\n");
235           my $ref = $work{"$f[2]$f[3]"};
236           if ($ref) {
237             $ref->stop_msg($self);
238                 $ref = undef;
239           }
240           
241           last SWITCH;
242         }
243   }
244 }
245
246
247 # store a message away on disc or whatever
248 #
249 # NOTE the second arg is a REFERENCE not a list
250 sub store
251 {
252   my $ref = shift;
253   my $lines = shift;
254   
255   # we only proceed if there are actually any lines in the file
256   if (!$lines || @{$lines} == 0) {
257         return;
258   }
259   
260   if ($ref->{file}) {   # a file
261     dbg('msg', "To be stored in $ref->{to}\n");
262   
263     my $fh = new FileHandle "$ref->{to}", "w";
264         if (defined $fh) {
265           my $line;
266           foreach $line (@{$lines}) {
267                 print $fh "$line\n";
268           }
269           $fh->close;
270           dbg('msg', "file $ref->{to} stored\n");
271     } else {
272       confess "can't open file $ref->{to} $!";  
273     }
274 #       push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode};
275   } else {              # a normal message
276
277     # attempt to open the message file
278         my $fn = filename($ref->{msgno});
279
280     dbg('msg', "To be stored in $fn\n");
281     
282         # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem)
283     my $fh = new FileHandle "$fn", "w";
284         if (defined $fh) {
285           my $rr = $ref->{rrreq} ? '1' : '0';
286           my $priv = $ref->{private} ? '1': '0';
287       print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{read}^$rr\n";
288           print $fh "=== ", join('^', @{$ref->{gotit}}), "\n";
289           my $line;
290           $ref->{size} = 0;
291           foreach $line (@{$lines}) {
292         $ref->{size} += (length $line) + 1;
293                 print $fh "$line\n";
294           }
295           $fh->close;
296           dbg('msg', "msg $ref->{msgno} stored\n");
297     } else {
298       confess "can't open msg file $fn $!";  
299     }
300   }
301 }
302
303 # delete a message
304 sub del_msg
305 {
306   my $self = shift;
307
308   # remove it from the active message list
309   @msg = map { $_ != $self ? $_ : () } @msg;
310   
311   # belt and braces (one day I will ask someone if this is REALLY necessary)
312   delete $self->{gotit};
313   delete $self->{list};
314   
315   # remove the file
316   unlink filename($self->{msgno});
317   dbg('msg', "deleting $self->{msgno}\n");
318 }
319
320 # read in a message header
321 sub read_msg_header
322
323   my $fn = shift;
324   my $file;
325   my $line;
326   my $ref;
327   my @f;
328   my $size;
329
330   $file = new FileHandle;
331   if (!open($file, $fn)) {
332     print "Error reading $fn $!\n";
333     return undef;
334   }
335   $size = -s $fn;
336   $line = <$file>;       # first line
337   chomp $line;
338   $size -= length $line;
339   if (! $line =~ /^===/o) {
340     print "corrupt first line in $fn ($line)\n";
341     return undef;
342   }
343   $line =~ s/^=== //o;
344   @f = split /\^/, $line;
345   $ref = DXMsg->alloc(@f);
346   
347   $line = <$file>;       # second line
348   chomp $line;
349   $size -= length $line;
350   if (! $line =~ /^===/o) {
351     print "corrupt second line in $fn ($line)\n";
352     return undef;
353   }
354   $line =~ s/^=== //o;
355   $ref->{gotit} = [];
356   @f = split /\^/, $line;
357   push @{$ref->{gotit}}, @f;
358   $ref->{size} = $size;
359  
360   close($file);
361   
362   return $ref;
363 }
364
365 # read in a message header
366 sub read_msg_body
367 {
368   my $self = shift;
369   my $msgno = $self->{msgno};
370   my $file;
371   my $line;
372   my $fn = filename($msgno);
373   my @out;
374
375   $file = new FileHandle;
376   if (!open($file, $fn)) {
377     print "Error reading $fn $!\n";
378     return undef;
379   }
380   chomp (@out = <$file>);
381   close($file);
382   
383   shift @out if $out[0] =~ /^=== /;
384   shift @out if $out[0] =~ /^=== /;
385   return @out;
386 }
387
388 # send a tranche of lines to the other end
389 sub send_tranche
390 {
391   my ($self, $dxchan) = @_;
392   my @out;
393   my $to = $self->{tonode};
394   my $from = $self->{fromnode};
395   my $stream = $self->{stream};
396   my $i;
397   
398   for ($i = 0; $i < $self->{linesreq} && $self->{count} < @{$self->{lines}}; $i++, $self->{count}++) {
399     push @out, DXProt::pc29($to, $from, $stream, ${$self->{lines}}[$self->{count}]);
400   }
401   push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq};
402   $dxchan->send(@out);
403 }
404
405
406 # find a message to send out and start the ball rolling
407 sub queue_msg
408 {
409   my $sort = shift;
410   my @nodelist = DXProt::get_all_ak1a();
411   my $ref;
412   my $clref;
413   my $dxchan;
414   
415   # bat down the message list looking for one that needs to go off site and whose
416   # nearest node is not busy.
417
418   dbg('msg', "queue msg ($sort)\n");
419   foreach $ref (@msg) {
420     # firstly, is it private and unread? if so can I find the recipient
421         # in my cluster node list offsite?
422         if ($ref->{private}) {
423           if ($ref->{read} == 0) {
424             $clref = DXCluster->get($ref->{to});
425                 if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) {
426                   $dxchan = $clref->{dxchan};
427                   $ref->start_msg($dxchan) if $clref && !get_busy($dxchan->call);
428                 }
429           }
430         } elsif ($sort == undef) {
431       # otherwise we are dealing with a bulletin, compare the gotit list with
432           # the nodelist up above, if there are sites that haven't got it yet
433           # then start sending it - what happens when we get loops is anyone's
434           # guess, use (to, from, time, subject) tuple?
435           my $noderef;
436           foreach $noderef (@nodelist) {
437             next if $noderef->call eq $main::mycall;
438                 next if grep { $_ eq $noderef->call } @{$ref->{gotit}};
439                 
440                 # if we are here we have a node that doesn't have this message
441                 $ref->start_msg($noderef) if !get_busy($noderef->call);
442                 last;
443           } 
444         }
445         
446         # if all the available nodes are busy then stop
447         last if @nodelist == scalar grep { get_busy($_->call) } @nodelist;
448   }
449 }
450
451 # start the message off on its travels with a PC28
452 sub start_msg
453 {
454   my ($self, $dxchan) = @_;
455
456   dbg('msg', "start msg $self->{msgno}\n");
457   $self->{linesreq} = 5;
458   $self->{count} = 0;
459   $self->{tonode} = $dxchan->call;
460   $self->{fromnode} = $main::mycall;
461   $busy{$dxchan->call} = $self;
462   $work{"$self->{tonode}"} = $self;
463   $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq}));
464 }
465
466 # get the ref of a busy node
467 sub get_busy
468 {
469   my $call = shift;
470   return $busy{$call};
471 }
472
473 # get the busy queue
474 sub get_all_busy
475 {
476   return values %busy;
477 }
478
479 # get the forwarding queue
480 sub get_fwq
481 {
482   return values %work;
483 }
484
485 # stop a message from continuing, clean it out, unlock interlocks etc
486 sub stop_msg
487 {
488   my ($self, $dxchan) = @_;
489   my $node = $dxchan->call;
490
491   dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n");
492   delete $work{$node};
493   delete $work{"$node$self->{stream}"};
494   $self->workclean;
495   delete $busy{$node};
496 }
497
498 # get a new transaction number from the file specified
499 sub next_transno
500 {
501   my $name = shift;
502   $name =~ s/\W//og;      # remove non-word characters
503   my $fn = "$msgdir/$name";
504   my $msgno;
505   
506   my $fh = new FileHandle;
507   if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) {
508     $fh->autoflush(1);
509         $msgno = $fh->getline;
510         chomp $msgno;
511         $msgno++;
512         seek $fh, 0, 0;
513         $fh->print("$msgno\n");
514         dbg('msg', "msgno $msgno allocated for $name\n");
515         $fh->close;
516   } else {
517     confess "can't open $fn $!";
518   }
519   return $msgno;
520 }
521
522 # initialise the message 'system', read in all the message headers
523 sub init
524 {
525   my $dir = new FileHandle;
526   my @dir;
527   my $ref;
528
529   # read in the directory
530   opendir($dir, $msgdir) or confess "can't open $msgdir $!";
531   @dir = readdir($dir);
532   closedir($dir);
533   
534   for (sort @dir) {
535     next if /^\./o;
536         next if ! /^m\d+/o;
537
538     $ref = read_msg_header("$msgdir/$_");
539         next if !$ref;
540         
541         # add the message to the available queue
542         add_dir($ref); 
543         
544   }
545 }
546
547 # add the message to the directory listing
548 sub add_dir
549 {
550   my $ref = shift;
551   confess "tried to add a non-ref to the msg directory" if !ref $ref;
552   push @msg, $ref;
553 }
554
555 # return all the current messages
556 sub get_all
557 {
558   return @msg;
559 }
560
561 # get a particular message
562 sub get
563 {
564   my $msgno = shift;
565   for (@msg) {
566     return $_ if $_->{msgno} == $msgno;
567     last if $_->{msgno} > $msgno;
568   }
569   return undef;
570 }
571
572 # return the official filename for a message no
573 sub filename
574 {
575   return sprintf "$msgdir/m%06d", shift;
576 }
577
578 #
579 # return a list of valid elements 
580
581
582 sub fields
583 {
584   return keys(%valid);
585 }
586
587 #
588 # return a prompt for a field
589 #
590
591 sub field_prompt
592
593   my ($self, $ele) = @_;
594   return $valid{$ele};
595 }
596
597 #
598 # send a message state machine
599 sub do_send_stuff
600 {
601   my $self = shift;
602   my $line = shift;
603   my @out;
604   
605   if ($self->state eq 'send1') {
606 #  $DB::single = 1;
607     confess "local var gone missing" if !ref $self->{loc};
608         my $loc = $self->{loc};
609         $loc->{subject} = $line;
610         $loc->{lines} = [];
611         $self->state('sendbody');
612         #push @out, $self->msg('sendbody');
613         push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit";
614   } elsif ($self->state eq 'sendbody') {
615     confess "local var gone missing" if !ref $self->{loc};
616         my $loc = $self->{loc};
617         if ($line eq "\032" || uc $line eq "/EX") {
618       my $to;
619
620       if (@{$loc->{lines}} > 0) {
621             foreach $to (@{$loc->{to}}) {
622               my $ref;
623                   my $systime = $main::systime;
624                   my $mycall = $main::mycall;
625                   $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'),
626                                 uc $to,
627                                                         $self->call, 
628                                                         $systime,
629                                                         $loc->{private}, 
630                                                         $loc->{subject}, 
631                                                         $mycall,
632                                                         '0',
633                                                         $loc->{rrreq});
634               $ref->store($loc->{lines});
635                   $ref->add_dir();
636                   #push @out, $self->msg('sendsent', $to);
637                   push @out, "msgno $ref->{msgno} sent to $to";
638                   my $dxchan = DXChannel->get(uc $to);
639                   $dxchan->send("New mail has arrived for you") if $dxchan;
640             }
641           }
642           delete $loc->{lines};
643           delete $loc->{to};
644           delete $self->{loc};
645           $self->state('prompt');
646           $self->func(undef);
647           DXMsg::queue_msg();
648     } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
649       #push @out, $self->msg('sendabort');
650           push @out, "aborted";
651           delete $loc->{lines};
652           delete $loc->{to};
653           delete $self->{loc};
654           $self->func(undef);
655           $self->state('prompt');
656     } else {
657   
658       # i.e. it ain't and end or abort, therefore store the line
659       push @{$loc->{lines}}, $line;
660     }
661   }
662   return (1, @out);
663 }
664
665 # return the standard directory line for this ref 
666 sub dir
667 {
668   my $ref = shift;
669   return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s", 
670     $ref->msgno, $ref->read ? '-' : ' ', $ref->private ? 'p' : ' ', $ref->size,
671         $ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject;
672 }
673
674 no strict;
675 sub AUTOLOAD
676 {
677   my $self = shift;
678   my $name = $AUTOLOAD;
679   return if $name =~ /::DESTROY$/;
680   $name =~ s/.*:://o;
681   
682   confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
683   @_ ? $self->{$name} = shift : $self->{$name} ;
684 }
685
686 1;
687
688 __END__