added directory command + dummy read, send and reply
[spider.git] / perl / DXMsg.pm
1 #!/usr/bin/perl
2 #
3 # This module impliments the message handling for a dx cluster
4 #
5 # Copyright (c) 1998 Dirk Koopman G1TLH
6 #
7 # $Id$
8
9
10 package DXMsg;
11
12 @ISA = qw(DXProt DXChannel);
13
14 use DXUtil;
15 use DXChannel;
16 use DXUser;
17 use DXM;
18 use DXCluster;
19 use DXProtVars;
20 use DXProtout;
21 use DXDebug;
22 use FileHandle;
23 use Carp;
24
25 use strict;
26 use vars qw(%work @msg $msgdir %valid);
27
28 %work = ();                # outstanding jobs
29 @msg = ();                 # messages we have
30 $msgdir = "$main::root/msg";              # directory contain the msgs
31
32 %valid = (
33   fromnode => '9,From Node',
34   tonode => '9,To Node',
35   to => '0,To',
36   from => '0,From',
37   t => '0,Msg Time,cldatetime',
38   private => '9,Private',
39   subject => '0,Subject',
40   linesreq => '0,Lines per Gob',
41   rrreq => '9,Read Confirm',
42   origin => '0,Origin',
43   lines => '5,Data',
44   stream => '9,Stream No',
45   count => '9,Gob Linecnt',
46   file => '9,File?,yesno',
47   gotit => '9,Got it Nodes,parray',
48   lines => '9,Lines,parray',
49   read => '9,Times read',
50   size => '0,Size',
51   msgno => '0,Msgno',
52 );
53
54 # allocate a new object
55 # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper  
56 sub alloc                  
57 {
58   my $pkg = shift;
59   my $self = bless {}, $pkg;
60   $self->{msgno} = shift;
61   $self->{to} = shift;
62   $self->{from} = shift;
63   $self->{t} = shift;
64   $self->{private} = shift;
65   $self->{subject} = shift;
66   $self->{origin} = shift;
67   $self->{read} = shift;
68     
69   return $self;
70 }
71
72 sub workclean
73 {
74   my $ref = shift;
75   delete $ref->{lines};
76   delete $ref->{linesreq};
77   delete $ref->{tonode};
78   delete $ref->{fromnode};
79   delete $ref->{stream};
80   delete $ref->{lines};
81   delete $ref->{file};
82   delete $ref->{count};
83 }
84
85 sub process
86 {
87   my ($self, $line) = @_;
88   my @f = split /[\^\~]/, $line;
89   my ($pcno) = $f[0] =~ /^PC(\d\d)/;          # just get the number
90   
91   SWITCH: {
92     if ($pcno == 28) {                        # incoming message
93           my $t = cltounix($f[5], $f[6]);
94           my $stream = next_transno($f[2]);
95           my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0');
96           
97           # fill in various forwarding state variables
98       $ref->{fromnode} = $f[2];
99       $ref->{tonode} = $f[1];
100           $ref->{rrreq} = $f[11];
101           $ref->{linesreq} = $f[10];
102           $ref->{stream} = $stream;
103           $ref->{count} = 0;                      # no of lines between PC31s
104           dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
105       $work{"$f[1]$f[2]$stream"} = $ref;         # store in work
106           $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
107           last SWITCH;
108         }
109         
110     if ($pcno == 29) {                        # incoming text
111           my $ref = $work{"$f[1]$f[2]$f[3]"};
112           if ($ref) {
113             push @{$ref->{lines}}, $f[4];
114                 $ref->{count}++;
115                 if ($ref->{count} >= $ref->{linesreq}) {
116                   $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
117                   dbg('msg', "stream $f[3]: $ref->{linereq} lines received\n");
118                   $ref->{count} = 0;
119                 }
120           }
121           last SWITCH;
122         }
123         
124     if ($pcno == 30) {
125           last SWITCH;
126         }
127         
128     if ($pcno == 31) {
129           last SWITCH;
130         }
131         
132     if ($pcno == 32) {                         # incoming EOM
133           dbg('msg', "stream $f[3]: EOM received\n");
134           my $ref = $work{"$f[1]$f[2]$f[3]"};
135           if ($ref) {
136             $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it
137                 $ref->store($ref->{lines});                    # store it (whatever that may mean)
138                 $ref->workclean;
139                 delete $work{"$f[1]$f[2]$f[3]"};       # remove the reference from the work vector
140           }
141           last SWITCH;
142         }
143         
144     if ($pcno == 33) {
145           last SWITCH;
146         }
147         
148         if ($pcno == 40) {                         # this is a file request
149           $f[3] =~ s/\\/\//og;                     # change the slashes
150           $f[3] =~ s/\.//og;                       # remove dots
151           $f[3] = lc $f[3];                        # to lower case;
152           dbg('msg', "incoming file $f[3]\n");
153           last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|msg)\//;    # prevent access to executables
154           
155           # create any directories
156           my @part = split /\//, $f[3];
157           my $part;
158           my $fn = "$main::root";
159           pop @part;         # remove last part
160           foreach $part (@part) {
161             $fn .= "/$part";
162                 next if -e $fn;
163             last SWITCH if !mkdir $fn, 0777;
164         dbg('msg', "created directory $fn\n");
165           }
166           my $stream = next_transno($f[2]);
167           my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0');
168           
169           # forwarding variables
170       $ref->{fromnode} = $f[1];
171       $ref->{tonode} = $f[2];
172           $ref->{linesreq} = $f[5];
173           $ref->{stream} = $stream;
174           $ref->{count} = 0;                      # no of lines between PC31s
175           $ref->{file} = 1;
176       $work{"$f[1]$f[2]$stream"} = $ref;         # store in work
177           $self->send(DXProt::pc30($f[2], $f[1], $stream));  # send ack 
178           
179           last SWITCH;
180         }
181   }
182 }
183
184
185 # store a message away on disc or whatever
186 sub store
187 {
188   my $ref = shift;
189   my $lines = shift;
190   
191   # we only proceed if there are actually any lines in the file
192   if (@{$lines} == 0) {
193         return;
194   }
195   
196   if ($ref->{file}) {   # a file
197     dbg('msg', "To be stored in $ref->{to}\n");
198   
199     my $fh = new FileHandle "$ref->{to}", "w";
200         if (defined $fh) {
201           my $line;
202           foreach $line (@{$lines}) {
203                 print $fh "$line\n";
204           }
205           $fh->close;
206           dbg('msg', "file $ref->{to} stored\n");
207     } else {
208       confess "can't open file $ref->{to} $!";  
209     }
210 #       push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode};
211   } else {              # a normal message
212
213     # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
214         my $msgno = next_transno("Msgno");
215
216     # attempt to open the message file
217         my $fn = filename($msgno);
218
219     dbg('msg', "To be stored in $fn\n");
220   
221     my $fh = new FileHandle "$fn", "w";
222         if (defined $fh) {
223       print $fh "=== $msgno^$ref->{to}^$ref->{from}^$ref->{t}^$ref->{private}^$ref->{subject}^$ref->{origin}^$ref->{read}\n";
224           print $fh "=== $ref->{fromnode}\n";
225           my $line;
226           foreach $line (@{$lines}) {
227         $ref->{size} += (length $line) + 1;
228                 print $fh "$line\n";
229           }
230           $ref->{gotit} = [];
231           $ref->{msgno} = $msgno;
232           push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode};
233           push @msg, $ref;           # add this message to the incore message list
234           $fh->close;
235           dbg('msg', "msg $msgno stored\n");
236     } else {
237       confess "can't open msg file $fn $!";  
238     }
239   }
240 }
241
242 # delete a message
243 sub del_msg
244 {
245   my $self = shift;
246
247   # remove it from the active message list
248   @msg = map { $_ != $self ? $_ : () } @msg;
249   
250   # remove the file
251   unlink filename($self->{msgno});
252 }
253
254 # read in a message header
255 sub read_msg_header
256
257   my $fn = shift;
258   my $file;
259   my $line;
260   my $ref;
261   my @f;
262   my $size;
263
264   $file = new FileHandle;
265   if (!open($file, $fn)) {
266     print "Error reading $fn $!\n";
267     return undef;
268   }
269   $size = -s $fn;
270   $line = <$file>;       # first line
271   chomp $line;
272   $size -= length $line;
273   if (! $line =~ /^===/o) {
274     print "corrupt first line in $fn ($line)\n";
275     return undef;
276   }
277   $line =~ s/^=== //o;
278   @f = split /\^/, $line;
279   $ref = DXMsg->alloc(@f);
280   
281   $line = <$file>;       # second line
282   chomp $line;
283   $size -= length $line;
284   if (! $line =~ /^===/o) {
285     print "corrupt second line in $fn ($line)\n";
286     return undef;
287   }
288   $line =~ s/^=== //o;
289   $ref->{gotit} = [];
290   @f = split /\^/, $line;
291   push @{$ref->{goit}}, @f;
292   $ref->{size} = $size;
293  
294   close($file);
295   
296   return $ref;
297 }
298
299 # read in a message header
300 sub read_msg_body
301 {
302   my $self = shift;
303   my $msgno = $self->{msgno};
304   my $file;
305   my $line;
306   my $fn = filename($msgno);
307   my @out;
308
309   $file = new FileHandle;
310   if (!open($file, $fn)) {
311     print "Error reading $fn $!\n";
312     return undef;
313   }
314   chomp (@out = <$file>);
315   close($file);
316   
317   shift @out if $out[0] =~ /^=== \d+\^/;
318   shift @out if $out[0] =~ /^=== \d+\^/;
319   return @out;
320 }
321
322 # get a new transaction number from the file specified
323 sub next_transno
324 {
325   my $name = shift;
326   $name =~ s/\W//og;      # remove non-word characters
327   my $fn = "$msgdir/$name";
328   my $msgno;
329   
330   my $fh = new FileHandle;
331   if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) {
332     $fh->autoflush(1);
333         $msgno = $fh->getline;
334         chomp $msgno;
335         $msgno++;
336         seek $fh, 0, 0;
337         $fh->print("$msgno\n");
338         dbg('msg', "msgno $msgno allocated for $name\n");
339         $fh->close;
340   } else {
341     confess "can't open $fn $!";
342   }
343   return $msgno;
344 }
345
346 # initialise the message 'system', read in all the message headers
347 sub init
348 {
349   my $dir = new FileHandle;
350   my @dir;
351   my $ref;
352
353   # read in the directory
354   opendir($dir, $msgdir) or confess "can't open $msgdir $!";
355   @dir = readdir($dir);
356   closedir($dir);
357   
358   for (sort @dir) {
359     next if /^\./o;
360         next if ! /^m\d+/o;
361
362     $ref = read_msg_header("$msgdir/$_");
363         next if !$ref;
364         
365         # add the clusters that have this
366         push @msg, $ref; 
367         
368   }
369 }
370
371 # return all the current messages
372 sub get_all
373 {
374   return @msg;
375 }
376
377 # return the official filename for a message no
378 sub filename
379 {
380   return sprintf "$msgdir/m%06d", shift;
381 }
382
383 #
384 # return a list of valid elements 
385
386
387 sub fields
388 {
389   return keys(%valid);
390 }
391
392 #
393 # return a prompt for a field
394 #
395
396 sub field_prompt
397
398   my ($self, $ele) = @_;
399   return $valid{$ele};
400 }
401
402 no strict;
403 sub AUTOLOAD
404 {
405   my $self = shift;
406   my $name = $AUTOLOAD;
407   return if $name =~ /::DESTROY$/;
408   $name =~ s/.*:://o;
409   
410   confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
411   @_ ? $self->{$name} = shift : $self->{$name} ;
412 }
413
414
415 1;
416
417 __END__