no text line msgs should now propagate
[spider.git] / perl / DXMsg.pm
1 #!/usr/bin/perl
2 #
3 # This module impliments the message handling for a dx cluster
4 #
5 # Copyright (c) 1998 Dirk Koopman G1TLH
6 #
7 # $Id$
8 #
9 #
10 # Notes for implementors:-
11 #
12 # PC28 field 11 is the RR required flag
13 # PC28 field 12 is a VIA routing (ie it is a node call) 
14 #
15
16 package DXMsg;
17
18 @ISA = qw(DXProt DXChannel);
19
20 use DXUtil;
21 use DXChannel;
22 use DXUser;
23 use DXM;
24 use DXCluster;
25 use DXProtVars;
26 use DXProtout;
27 use DXDebug;
28 use DXLog;
29 use IO::File;
30 use Fcntl;
31
32 use strict;
33 use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
34                         @badmsg @swop $swopfn $badmsgfn $forwardfn @forward $timeout $waittime
35                     $queueinterval $lastq $importfn $minchunk $maxchunk);
36
37 %work = ();                                             # outstanding jobs
38 @msg = ();                                              # messages we have
39 %busy = ();                                             # station interlocks
40 $msgdir = "$main::root/msg";    # directory contain the msgs
41 $maxage = 30 * 86400;                   # the maximum age that a message shall live for if not marked 
42 $last_clean = 0;                                # last time we did a clean
43 @forward = ();                  # msg forward table
44 @badmsg = ();                                   # bad message table
45 @swop = ();                                             # swop table
46 $timeout = 30*60;               # forwarding timeout
47 $waittime = 30*60;              # time an aborted outgoing message waits before trying again
48 $queueinterval = 1*60;          # run the queue every 1 minute
49 $lastq = 0;
50
51 $minchunk = 4800;               # minimum chunk size for a split message
52 $maxchunk = 6000;               # maximum chunk size
53
54 $badmsgfn = "$msgdir/badmsg.pl";    # list of TO address we wont store
55 $forwardfn = "$msgdir/forward.pl";  # the forwarding table
56 $swopfn = "$msgdir/swop.pl";        # the swopping table
57 $importfn = "$msgdir/import";       # import directory
58
59
60 %valid = (
61                   fromnode => '5,From Node',
62                   tonode => '5,To Node',
63                   to => '0,To',
64                   from => '0,From',
65                   t => '0,Msg Time,cldatetime',
66                   private => '5,Private',
67                   subject => '0,Subject',
68                   linesreq => '0,Lines per Gob',
69                   rrreq => '5,Read Confirm',
70                   origin => '0,Origin',
71                   lines => '5,Data',
72                   stream => '9,Stream No',
73                   count => '5,Gob Linecnt',
74                   file => '5,File?,yesno',
75                   gotit => '5,Got it Nodes,parray',
76                   lines => '5,Lines,parray',
77                   'read' => '5,Times read',
78                   size => '0,Size',
79                   msgno => '0,Msgno',
80                   keep => '0,Keep this?,yesno',
81                   lastt => '5,Last processed,cldatetime',
82                   waitt => '5,Wait until,cldatetime',
83                  );
84
85 sub DESTROY
86 {
87         my $self = shift;
88         undef $self->{lines};
89         undef $self->{gotit};
90 }
91
92 # allocate a new object
93 # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper  
94 sub alloc                  
95 {
96         my $pkg = shift;
97         my $self = bless {}, $pkg;
98         $self->{msgno} = shift;
99         my $to = shift;
100         #  $to =~ s/-\d+$//o;
101         $self->{to} = ($to eq $main::mycall) ? $main::myalias : $to;
102         my $from = shift;
103         $from =~ s/-\d+$//o;
104         $self->{from} = uc $from;
105         $self->{t} = shift;
106         $self->{private} = shift;
107         $self->{subject} = shift;
108         $self->{origin} = shift;
109         $self->{'read'} = shift;
110         $self->{rrreq} = shift;
111         $self->{gotit} = [];
112         $self->{lastt} = $main::systime;
113         $self->{lines} = [];
114     
115         return $self;
116 }
117
118 sub workclean
119 {
120         my $ref = shift;
121         delete $ref->{lines};
122         delete $ref->{linesreq};
123         delete $ref->{tonode};
124         delete $ref->{fromnode};
125         delete $ref->{stream};
126         delete $ref->{file};
127         delete $ref->{count};
128         delete $ref->{lastt} if exists $ref->{lastt};
129         delete $ref->{waitt} if exists $ref->{waitt};
130 }
131
132 sub process
133 {
134         my ($self, $line) = @_;
135
136         # this is periodic processing
137         if (!$self || !$line) {
138
139                 if ($main::systime >= $lastq + $queueinterval) {
140
141                         # wander down the work queue stopping any messages that have timed out
142                         for (keys %busy) {
143                                 my $node = $_;
144                                 my $ref = $busy{$_};
145                                 if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) {
146                                         dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node");
147                                         $ref->stop_msg($node);
148                                         
149                                         # delay any outgoing messages that fail
150                                         $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall;
151                                 }
152                         }
153
154                         # queue some message if the interval timer has gone off
155                         queue_msg(0);
156
157                         # import any messages in the import directory
158                         import_msgs();
159                         
160                         $lastq = $main::systime;
161                 }
162
163                 # clean the message queue
164                 clean_old() if $main::systime - $last_clean > 3600 ;
165                 return;
166         }
167
168         my @f = split /\^/, $line;
169         my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
170
171  SWITCH: {
172                 if ($pcno == 28) {              # incoming message
173
174                         # first look for any messages in the busy queue 
175                         # and cancel them this should both resolve timed out incoming messages
176                         # and crossing of message between nodes, incoming messages have priority
177                         if (exists $busy{$f[2]}) {
178                                 my $ref = $busy{$f[2]};
179                                 my $tonode = $ref->{tonode};
180                                 dbg('msg', "Busy, stopping msgno: $ref->{msgno} -> $f[2]");
181                                 $ref->stop_msg($self->call);
182                         }
183
184                         my $t = cltounix($f[5], $f[6]);
185                         my $stream = next_transno($f[2]);
186                         $f[13] = $self->call unless $f[13] && $f[13] gt ' ';
187                         my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
188                         
189                         # fill in various forwarding state variables
190                         $ref->{fromnode} = $f[2];
191                         $ref->{tonode} = $f[1];
192                         $ref->{rrreq} = $f[11];
193                         $ref->{linesreq} = $f[10];
194                         $ref->{stream} = $stream;
195                         $ref->{count} = 0;      # no of lines between PC31s
196                         dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
197                         Log('msg', "Incoming message $f[4] to $f[3] '$f[8]'" );
198                         $work{"$f[2]$stream"} = $ref; # store in work
199                         $busy{$f[2]} = $ref; # set interlock
200                         $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
201                         $ref->{lastt} = $main::systime;
202
203                         # look to see whether this is a non private message sent to a known callsign
204                         my $uref = DXUser->get_current($ref->{to});
205                         if (iscallsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) {
206                                 $ref->{private} = 1;
207                                 dbg('msg', "set bull to $ref->{to} to private");
208                         }
209                         last SWITCH;
210                 }
211                 
212                 if ($pcno == 29) {              # incoming text
213                         my $ref = $work{"$f[2]$f[3]"};
214                         if ($ref) {
215                                 $f[4] =~ s/\%5E/^/g;
216                                 push @{$ref->{lines}}, $f[4];
217                                 $ref->{count}++;
218                                 if ($ref->{count} >= $ref->{linesreq}) {
219                                         $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
220                                         dbg('msg', "stream $f[3]: $ref->{count} lines received\n");
221                                         $ref->{count} = 0;
222                                 }
223                                 $ref->{lastt} = $main::systime;
224                         } else {
225                                 dbg('msg', "PC29 from unknown stream $f[3] from $f[2]" );
226                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
227                         }
228                         last SWITCH;
229                 }
230                 
231                 if ($pcno == 30) {              # this is a incoming subject ack
232                         my $ref = $work{$f[2]}; # note no stream at this stage
233                         if ($ref) {
234                                 delete $work{$f[2]};
235                                 $ref->{stream} = $f[3];
236                                 $ref->{count} = 0;
237                                 $ref->{linesreq} = 5;
238                                 $work{"$f[2]$f[3]"} = $ref;     # new ref
239                                 dbg('msg', "incoming subject ack stream $f[3]\n");
240                                 $busy{$f[2]} = $ref; # interlock
241                                 push @{$ref->{lines}}, ($ref->read_msg_body);
242                                 $ref->send_tranche($self);
243                                 $ref->{lastt} = $main::systime;
244                         } else {
245                                 dbg('msg', "PC30 from unknown stream $f[3] from $f[2]" );
246                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
247                         } 
248                         last SWITCH;
249                 }
250                 
251                 if ($pcno == 31) {              # acknowledge a tranche of lines
252                         my $ref = $work{"$f[2]$f[3]"};
253                         if ($ref) {
254                                 dbg('msg', "tranche ack stream $f[3]\n");
255                                 $ref->send_tranche($self);
256                                 $ref->{lastt} = $main::systime;
257                         } else {
258                                 dbg('msg', "PC31 from unknown stream $f[3] from $f[2]" );
259                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
260                         } 
261                         last SWITCH;
262                 }
263                 
264                 if ($pcno == 32) {              # incoming EOM
265                         dbg('msg', "stream $f[3]: EOM received\n");
266                         my $ref = $work{"$f[2]$f[3]"};
267                         if ($ref) {
268                                 $self->send(DXProt::pc33($f[2], $f[1], $f[3])); # acknowledge it
269                                 
270                                 # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
271                                 # store the file or message
272                                 # remove extraneous rubbish from the hash
273                                 # remove it from the work in progress vector
274                                 # stuff it on the msg queue
275                                 if ($ref->{lines}) {
276                                         if ($ref->{file}) {
277                                                 $ref->store($ref->{lines});
278                                         } else {
279
280                                                 # does an identical message already exist?
281                                                 my $m;
282                                                 for $m (@msg) {
283                                                         if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) {
284                                                                 $ref->stop_msg($self->call);
285                                                                 my $msgno = $m->{msgno};
286                                                                 dbg('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno");
287                                                                 Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno");
288                                                                 return;
289                                                         }
290                                                 }
291
292                                                 # swop addresses
293                                                 $ref->swop_it($self->call);
294                                                 
295                                                 # look for 'bad' to addresses 
296 #                                               if (grep $ref->{to} eq $_, @badmsg) {
297                                                 if ($ref->dump_it($self->call)) {
298                                                         $ref->stop_msg($self->call);
299                                                         dbg('msg', "'Bad' message $ref->{to}");
300                                                         Log('msg', "'Bad' message $ref->{to}");
301                                                         return;
302                                                 }
303
304                                                 $ref->{msgno} = next_transno("Msgno");
305                                                 push @{$ref->{gotit}}, $f[2]; # mark this up as being received
306                                                 $ref->store($ref->{lines});
307                                                 add_dir($ref);
308                                                 my $dxchan = DXChannel->get($ref->{to});
309                                                 $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user;
310                                                 Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
311                                         }
312                                 }
313                                 $ref->stop_msg($self->call);
314                         } else {
315                                 dbg('msg', "PC32 from unknown stream $f[3] from $f[2]" );
316                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
317                         }
318                         # queue_msg(0);
319                         last SWITCH;
320                 }
321                 
322                 if ($pcno == 33) {              # acknowledge the end of message
323                         my $ref = $work{"$f[2]$f[3]"};
324                         if ($ref) {
325                                 if ($ref->{private}) { # remove it if it private and gone off site#
326                                         Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2] and deleted");
327                                         $ref->del_msg;
328                                 } else {
329                                         Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2]");
330                                         push @{$ref->{gotit}}, $f[2]; # mark this up as being received
331                                         $ref->store($ref->{lines});     # re- store the file
332                                 }
333                                 $ref->stop_msg($self->call);
334                         } else {
335                                 dbg('msg', "PC33 from unknown stream $f[3] from $f[2]" );
336                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
337                         } 
338
339                         # send next one if present
340                         queue_msg(0);
341                         last SWITCH;
342                 }
343                 
344                 if ($pcno == 40) {              # this is a file request
345                         $f[3] =~ s/\\/\//og; # change the slashes
346                         $f[3] =~ s/\.//og;      # remove dots
347                         $f[3] =~ s/^\///o;   # remove the leading /
348                         $f[3] = lc $f[3];       # to lower case;
349                         dbg('msg', "incoming file $f[3]\n");
350                         $f[3] = 'packclus/' . $f[3] unless $f[3] =~ /^packclus\//o;
351                         
352                         # create any directories
353                         my @part = split /\//, $f[3];
354                         my $part;
355                         my $fn = "$main::root";
356                         pop @part;                      # remove last part
357                         foreach $part (@part) {
358                                 $fn .= "/$part";
359                                 next if -e $fn;
360                                 last SWITCH if !mkdir $fn, 0777;
361                                 dbg('msg', "created directory $fn\n");
362                         }
363                         my $stream = next_transno($f[2]);
364                         my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0');
365                         
366                         # forwarding variables
367                         $ref->{fromnode} = $f[1];
368                         $ref->{tonode} = $f[2];
369                         $ref->{linesreq} = $f[5];
370                         $ref->{stream} = $stream;
371                         $ref->{count} = 0;      # no of lines between PC31s
372                         $ref->{file} = 1;
373                         $ref->{lastt} = $main::systime;
374                         $work{"$f[2]$stream"} = $ref; # store in work
375                         $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack 
376                         
377                         last SWITCH;
378                 }
379                 
380                 if ($pcno == 42) {              # abort transfer
381                         dbg('msg', "stream $f[3]: abort received\n");
382                         my $ref = $work{"$f[2]$f[3]"};
383                         if ($ref) {
384                                 $ref->stop_msg($self->call);
385                                 $ref = undef;
386                         }
387                         last SWITCH;
388                 }
389
390                 if ($pcno == 49) {      # global delete on subject
391                         for (@msg) {
392                                 if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
393                                         $_->del_msg();
394                                         Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
395                                         DXProt::broadcast_ak1a($line, $self);
396                                 }
397                         }
398                 }
399         }
400 }
401
402
403 # store a message away on disc or whatever
404 #
405 # NOTE the second arg is a REFERENCE not a list
406 sub store
407 {
408         my $ref = shift;
409         my $lines = shift;
410
411         if ($ref->{file}) {                     # a file
412                 dbg('msg', "To be stored in $ref->{to}\n");
413                 
414                 my $fh = new IO::File "$ref->{to}", "w";
415                 if (defined $fh) {
416                         my $line;
417                         foreach $line (@{$lines}) {
418                                 print $fh "$line\n";
419                         }
420                         $fh->close;
421                         dbg('msg', "file $ref->{to} stored\n");
422                         Log('msg', "file $ref->{to} from $ref->{from} stored" );
423                 } else {
424                         confess "can't open file $ref->{to} $!";  
425                 }
426         } else {                                        # a normal message
427
428                 # attempt to open the message file
429                 my $fn = filename($ref->{msgno});
430                 
431                 dbg('msg', "To be stored in $fn\n");
432                 
433                 # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem)
434                 my $fh = new IO::File "$fn", "w";
435                 if (defined $fh) {
436                         my $rr = $ref->{rrreq} ? '1' : '0';
437                         my $priv = $ref->{private} ? '1': '0';
438                         print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{'read'}^$rr\n";
439                         print $fh "=== ", join('^', @{$ref->{gotit}}), "\n";
440                         my $line;
441                         $ref->{size} = 0;
442                         foreach $line (@{$lines}) {
443                                 $ref->{size} += (length $line) + 1;
444                                 print $fh "$line\n";
445                         }
446                         $fh->close;
447                         dbg('msg', "msg $ref->{msgno} stored\n");
448                         Log('msg', "msg $ref->{msgno} from $ref->{from} to $ref->{to} stored" );
449                 } else {
450                         confess "can't open msg file $fn $!";  
451                 }
452         }
453 }
454
455 # delete a message
456 sub del_msg
457 {
458         my $self = shift;
459         
460         # remove it from the active message list
461         @msg = map { $_ != $self ? $_ : () } @msg;
462         
463         # belt and braces (one day I will ask someone if this is REALLY necessary)
464         delete $self->{gotit};
465         delete $self->{list};
466         
467         # remove the file
468         unlink filename($self->{msgno});
469         dbg('msg', "deleting $self->{msgno}\n");
470 }
471
472 # clean out old messages from the message queue
473 sub clean_old
474 {
475         my $ref;
476         
477         # mark old messages for deletion
478         foreach $ref (@msg) {
479                 if (!$ref->{keep} && $ref->{t} < $main::systime - $maxage) {
480                         $ref->{deleteme} = 1;
481                         delete $ref->{gotit};
482                         delete $ref->{list};
483                         unlink filename($ref->{msgno});
484                         dbg('msg', "deleting old $ref->{msgno}\n");
485                 }
486         }
487         
488         # remove them all from the active message list
489         @msg = map { $_->{deleteme} ? () : $_ } @msg;
490         $last_clean = $main::systime;
491 }
492
493 # read in a message header
494 sub read_msg_header
495
496         my $fn = shift;
497         my $file;
498         my $line;
499         my $ref;
500         my @f;
501         my $size;
502         
503         $file = new IO::File "$fn";
504         if (!$file) {
505             dbg('err', "Error reading $fn $!");
506             Log('err', "Error reading $fn $!");
507                 return undef;
508         }
509         $size = -s $fn;
510         $line = <$file>;                        # first line
511         if ($size == 0 || !$line) {
512             dbg('err', "Empty $fn $!");
513             Log('err', "Empty $fn $!");
514                 return undef;
515         }
516         chomp $line;
517         $size -= length $line;
518         if (! $line =~ /^===/o) {
519                 dbg('err', "corrupt first line in $fn ($line)");
520                 Log('err', "corrupt first line in $fn ($line)");
521                 return undef;
522         }
523         $line =~ s/^=== //o;
524         @f = split /\^/, $line;
525         $ref = DXMsg->alloc(@f);
526         
527         $line = <$file>;                        # second line
528         chomp $line;
529         $size -= length $line;
530         if (! $line =~ /^===/o) {
531             dbg('err', "corrupt second line in $fn ($line)");
532             Log('err', "corrupt second line in $fn ($line)");
533                 return undef;
534         }
535         $line =~ s/^=== //o;
536         $ref->{gotit} = [];
537         @f = split /\^/, $line;
538         push @{$ref->{gotit}}, @f;
539         $ref->{size} = $size;
540         
541         close($file);
542         
543         return $ref;
544 }
545
546 # read in a message header
547 sub read_msg_body
548 {
549         my $self = shift;
550         my $msgno = $self->{msgno};
551         my $file;
552         my $line;
553         my $fn = filename($msgno);
554         my @out;
555         
556         $file = new IO::File;
557         if (!open($file, $fn)) {
558                 dbg('err' ,"Error reading $fn $!");
559                 Log('err' ,"Error reading $fn $!");
560                 return undef;
561         }
562         @out = map {chomp; $_} <$file>;
563         close($file);
564         
565         shift @out if $out[0] =~ /^=== /;
566         shift @out if $out[0] =~ /^=== /;
567         return @out;
568 }
569
570 # send a tranche of lines to the other end
571 sub send_tranche
572 {
573         my ($self, $dxchan) = @_;
574         my @out;
575         my $to = $self->{tonode};
576         my $from = $self->{fromnode};
577         my $stream = $self->{stream};
578         my $lines = $self->{lines};
579         my ($c, $i);
580         
581         for ($i = 0, $c = $self->{count}; $i < $self->{linesreq} && $c < @$lines; $i++, $c++) {
582                 push @out, DXProt::pc29($to, $from, $stream, $lines->[$c]);
583     }
584     $self->{count} = $c;
585
586     push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq};
587         $dxchan->send(@out);
588 }
589
590         
591 # find a message to send out and start the ball rolling
592 sub queue_msg
593 {
594         my $sort = shift;
595         my $call = shift;
596         my $ref;
597         my $clref;
598         
599         # bat down the message list looking for one that needs to go off site and whose
600         # nearest node is not busy.
601
602         dbg('msg', "queue msg ($sort)\n");
603         my @nodelist = DXChannel::get_all_nodes;
604         foreach $ref (@msg) {
605                 # firstly, is it private and unread? if so can I find the recipient
606                 # in my cluster node list offsite?
607
608                 # ignore 'delayed' messages until their waiting time has expired
609                 if (exists $ref->{waitt}) {
610                         next if $ref->{waitt} > $main::systime;
611                         delete $ref->{waitt};
612                 } 
613
614                 # deal with routed private messages
615                 my $noderef;
616                 if ($ref->{private}) {
617                         next if $ref->{'read'};           # if it is read, it is stuck here
618                         $clref = DXCluster->get_exact($ref->{to});
619                         unless ($clref) {             # otherwise look for a homenode
620                                 my $uref = DXUser->get_current($ref->{to});
621                                 my $hnode =  $uref->homenode if $uref;
622                                 $clref = DXCluster->get_exact($hnode) if $hnode;
623                         }
624                         if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all()) {
625                                 next if $clref->call eq $main::mycall;  # i.e. it lives here
626                                 $noderef = $clref->{dxchan};
627                                 $ref->start_msg($noderef) if !get_busy($noderef->call)  && $noderef->state eq 'normal';
628                         }
629                 }
630                 
631                 # otherwise we are dealing with a bulletin or forwarded private message
632                 # compare the gotit list with
633                 # the nodelist up above, if there are sites that haven't got it yet
634                 # then start sending it - what happens when we get loops is anyone's
635                 # guess, use (to, from, time, subject) tuple?
636                 foreach $noderef (@nodelist) {
637                         next if $noderef->call eq $main::mycall;
638                         next if grep { $_ eq $noderef->call } @{$ref->{gotit}};
639                         next unless $ref->forward_it($noderef->call);           # check the forwarding file
640
641                         # if we are here we have a node that doesn't have this message
642                         $ref->start_msg($noderef) if !get_busy($noderef->call)  && $noderef->state eq 'normal';
643                         last;
644                 }
645
646                 # if all the available nodes are busy then stop
647                 last if @nodelist == scalar grep { get_busy($_->call) } @nodelist;
648         }
649 }
650
651 # is there a message for me?
652 sub for_me
653 {
654         my $call = uc shift;
655         my $ref;
656         
657         foreach $ref (@msg) {
658                 # is it for me, private and unread? 
659                 if ($ref->{to} eq $call && $ref->{private}) {
660                         return 1 if !$ref->{'read'};
661                 }
662         }
663         return 0;
664 }
665
666 # start the message off on its travels with a PC28
667 sub start_msg
668 {
669         my ($self, $dxchan) = @_;
670         
671         dbg('msg', "start msg $self->{msgno}\n");
672         $self->{linesreq} = 5;
673         $self->{count} = 0;
674         $self->{tonode} = $dxchan->call;
675         $self->{fromnode} = $main::mycall;
676         $busy{$self->{tonode}} = $self;
677         $work{$self->{tonode}} = $self;
678         $self->{lastt} = $main::systime;
679         $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq}));
680 }
681
682 # get the ref of a busy node
683 sub get_busy
684 {
685         my $call = shift;
686         return $busy{$call};
687 }
688
689 # get the busy queue
690 sub get_all_busy
691 {
692         return values %busy;
693 }
694
695 # get the forwarding queue
696 sub get_fwq
697 {
698         return values %work;
699 }
700
701 # stop a message from continuing, clean it out, unlock interlocks etc
702 sub stop_msg
703 {
704         my $self = shift;
705         my $node = shift;
706         my $stream = $self->{stream} if exists $self->{stream};
707         
708         
709         dbg('msg', "stop msg $self->{msgno} -> node $node\n");
710         delete $work{$node};
711         delete $work{"$node$stream"} if $stream;
712         $self->workclean;
713         delete $busy{$node};
714 }
715
716 # get a new transaction number from the file specified
717 sub next_transno
718 {
719         my $name = shift;
720         $name =~ s/\W//og;                      # remove non-word characters
721         my $fn = "$msgdir/$name";
722         my $msgno;
723         
724         my $fh = new IO::File;
725         if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) {
726                 $fh->autoflush(1);
727                 $msgno = $fh->getline;
728                 chomp $msgno;
729                 $msgno++;
730                 seek $fh, 0, 0;
731                 $fh->print("$msgno\n");
732                 dbg('msg', "msgno $msgno allocated for $name\n");
733                 $fh->close;
734         } else {
735                 confess "can't open $fn $!";
736         }
737         return $msgno;
738 }
739
740 # initialise the message 'system', read in all the message headers
741 sub init
742 {
743         my $dir = new IO::File;
744         my @dir;
745         my $ref;
746                 
747         # load various control files
748         dbg('err', "load badmsg: " . (load_badmsg() or "Ok"));
749         dbg('err', "load forward: " . (load_forward() or "Ok"));
750         dbg('err', "load swop: " . (load_swop() or "Ok"));
751
752         # read in the directory
753         opendir($dir, $msgdir) or confess "can't open $msgdir $!";
754         @dir = readdir($dir);
755         closedir($dir);
756
757         @msg = ();
758         for (sort @dir) {
759                 next unless /^m\d\d\d\d\d\d$/;
760                 
761                 $ref = read_msg_header("$msgdir/$_");
762                 unless ($ref) {
763                         dbg('err', "Deleting $_");
764                         Log('err', "Deleting $_");
765                         unlink "$msgdir/$_";
766                         next;
767                 }
768                 
769                 # delete any messages to 'badmsg.pl' places
770                 if (grep $ref->{to} eq $_, @badmsg) {
771                         dbg('msg', "'Bad' TO address $ref->{to}");
772                         Log('msg', "'Bad' TO address $ref->{to}");
773                         $ref->del_msg;
774                         next;
775                 }
776
777                 # add the message to the available queue
778                 add_dir($ref); 
779         }
780 }
781
782 # add the message to the directory listing
783 sub add_dir
784 {
785         my $ref = shift;
786         confess "tried to add a non-ref to the msg directory" if !ref $ref;
787         push @msg, $ref;
788 }
789
790 # return all the current messages
791 sub get_all
792 {
793         return @msg;
794 }
795
796 # get a particular message
797 sub get
798 {
799         my $msgno = shift;
800         for (@msg) {
801                 return $_ if $_->{msgno} == $msgno;
802                 last if $_->{msgno} > $msgno;
803         }
804         return undef;
805 }
806
807 # return the official filename for a message no
808 sub filename
809 {
810         return sprintf "$msgdir/m%06d", shift;
811 }
812
813 #
814 # return a list of valid elements 
815
816
817 sub fields
818 {
819         return keys(%valid);
820 }
821
822 #
823 # return a prompt for a field
824 #
825
826 sub field_prompt
827
828         my ($self, $ele) = @_;
829         return $valid{$ele};
830 }
831
832 #
833 # send a message state machine
834 sub do_send_stuff
835 {
836         my $self = shift;
837         my $line = shift;
838         my @out;
839         
840         if ($self->state eq 'send1') {
841                 #  $DB::single = 1;
842                 confess "local var gone missing" if !ref $self->{loc};
843                 my $loc = $self->{loc};
844                 $loc->{subject} = $line;
845                 $loc->{lines} = [];
846                 $self->state('sendbody');
847                 #push @out, $self->msg('sendbody');
848                 push @out, $self->msg('m8');
849         } elsif ($self->state eq 'sendbody') {
850                 confess "local var gone missing" if !ref $self->{loc};
851                 my $loc = $self->{loc};
852                 if ($line eq "\032" || $line eq '%1A' || uc $line eq "/EX") {
853                         my $to;
854                         
855                         foreach $to (@{$loc->{to}}) {
856                                 my $ref;
857                                 my $systime = $main::systime;
858                                 my $mycall = $main::mycall;
859                                 $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'),
860                                                                         uc $to,
861                                                                         exists $loc->{from} ? $loc->{from} : $self->call, 
862                                                                         $systime,
863                                                                         $loc->{private}, 
864                                                                         $loc->{subject}, 
865                                                                         exists $loc->{origin} ? $loc->{origin} : $mycall,
866                                                                         '0',
867                                                                         $loc->{rrreq});
868                                 $ref->swop_it($self->call);
869                                 $ref->store($loc->{lines});
870                                 $ref->add_dir();
871                                 push @out, $self->msg('m11', $ref->{msgno}, $to);
872                                 #push @out, "msgno $ref->{msgno} sent to $to";
873                                 my $dxchan = DXChannel->get(uc $to);
874                                 if ($dxchan) {
875                                         if ($dxchan->is_user()) {
876                                                 $dxchan->send($dxchan->msg('m9'));
877                                         }
878                                 }
879                         }
880
881                         delete $loc->{lines};
882                         delete $loc->{to};
883                         delete $self->{loc};
884                         $self->func(undef);
885                         
886                         $self->state('prompt');
887                 } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
888                         #push @out, $self->msg('sendabort');
889                         push @out, $self->msg('m10');
890                         delete $loc->{lines};
891                         delete $loc->{to};
892                         delete $self->{loc};
893                         $self->func(undef);
894                         $self->state('prompt');
895                 } else {
896                         
897                         # i.e. it ain't and end or abort, therefore store the line
898                         push @{$loc->{lines}}, length($line) > 0 ? $line : " ";
899                 }
900         }
901         return (1, @out);
902 }
903
904 # return the standard directory line for this ref 
905 sub dir
906 {
907         my $ref = shift;
908         return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s", 
909                 $ref->msgno, $ref->read ? '-' : ' ', $ref->private ? 'p' : ' ', $ref->size,
910                         $ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject;
911 }
912
913 # load the forward table
914 sub load_forward
915 {
916         my @out;
917         my $s = readfilestr($forwardfn);
918         if ($s) {
919                 eval $s;
920                 push @out, $@ if $@;
921         }
922         return @out;
923 }
924
925 # load the bad message table
926 sub load_badmsg
927 {
928         my @out;
929         my $s = readfilestr($badmsgfn);
930         if ($s) {
931                 eval $s;
932                 push @out, $@ if $@;
933         }
934         return @out;
935 }
936
937 # load the swop message table
938 sub load_swop
939 {
940         my @out;
941         my $s = readfilestr($swopfn);
942         if ($s) {
943                 eval $s;
944                 push @out, $@ if $@;
945         }
946         return @out;
947 }
948
949 #
950 # forward that message or not according to the forwarding table
951 # returns 1 for forward, 0 - to ignore
952 #
953
954 sub forward_it
955 {
956         my $ref = shift;
957         my $call = shift;
958         my $i;
959         
960         for ($i = 0; $i < @forward; $i += 5) {
961                 my ($sort, $field, $pattern, $action, $bbs) = @forward[$i..($i+4)]; 
962                 my $tested;
963                 
964                 # are we interested?
965                 next if $ref->{private} && $sort ne 'P';
966                 next if !$ref->{private} && $sort ne 'B';
967                 
968                 # select field
969                 $tested = $ref->{to} if $field eq 'T';
970                 $tested = $ref->{from} if $field eq 'F';
971                 $tested = $ref->{origin} if $field eq 'O';
972                 $tested = $ref->{subject} if $field eq 'S';
973
974                 if (!$pattern || $tested =~ m{$pattern}i) {
975                         return 0 if $action eq 'I';
976                         return 1 if !$bbs || grep $_ eq $call, @{$bbs};
977                 }
978         }
979         return 0;
980 }
981
982 sub dump_it
983 {
984         my $ref = shift;
985         my $call = shift;
986         my $i;
987         
988         for ($i = 0; $i < @badmsg; $i += 3) {
989                 my ($sort, $field, $pattern) = @badmsg[$i..($i+2)]; 
990                 my $tested;
991                 
992                 # are we interested?
993                 next if $ref->{private} && $sort ne 'P';
994                 next if !$ref->{private} && $sort ne 'B';
995                 
996                 # select field
997                 $tested = $ref->{to} if $field eq 'T';
998                 $tested = $ref->{from} if $field eq 'F';
999                 $tested = $ref->{origin} if $field eq 'O';
1000                 $tested = $ref->{subject} if $field eq 'S';
1001
1002                 if (!$pattern || $tested =~ m{$pattern}i) {
1003                         return 1;
1004                 }
1005         }
1006         return 0;
1007 }
1008
1009 sub swop_it
1010 {
1011         my $ref = shift;
1012         my $call = shift;
1013         my $i;
1014         my $count = 0;
1015         
1016         for ($i = 0; $i < @swop; $i += 5) {
1017                 my ($sort, $field, $pattern, $tfield, $topattern) = @swop[$i..($i+4)]; 
1018                 my $tested;
1019                 my $swop;
1020                 my $old;
1021                 
1022                 # are we interested?
1023                 next if $ref->{private} && $sort ne 'P';
1024                 next if !$ref->{private} && $sort ne 'B';
1025                 
1026                 # select field
1027                 $tested = $ref->{to} if $field eq 'T';
1028                 $tested = $ref->{from} if $field eq 'F';
1029                 $tested = $ref->{origin} if $field eq 'O';
1030                 $tested = $ref->{subject} if $field eq 'S';
1031
1032                 # select swop field
1033                 $old = $swop = $ref->{to} if $tfield eq 'T';
1034                 $old = $swop = $ref->{from} if $tfield eq 'F';
1035                 $old = $swop = $ref->{origin} if $tfield eq 'O';
1036                 $old = $swop = $ref->{subject} if $tfield eq 'S';
1037
1038                 if ($tested =~ m{$pattern}i) {
1039                         if ($tested eq $swop) {
1040                                 $swop =~ s{$pattern}{$topattern}i;
1041                         } else {
1042                                 $swop = $topattern;
1043                         }
1044                         Log('msg', "Msg $ref->{msgno}: $tfield $old -> $swop");
1045                         Log('dbg', "Msg $ref->{msgno}: $tfield $old -> $swop");
1046                         $ref->{to} = $swop if $tfield eq 'T';
1047                         $ref->{from} = $swop if $tfield eq 'F';
1048                         $ref->{origin} = $swop if $tfield eq 'O';
1049                         $ref->{subject} = $swop if $tfield eq 'S';
1050                         ++$count;
1051                 }
1052         }
1053         return $count;
1054 }
1055
1056 # import any msgs in the import directory
1057 # the messages are in BBS format (but may have cluster extentions
1058 # so SB UK < GB7TLH is legal
1059 sub import_msgs
1060 {
1061         # are there any to do in this directory?
1062         return unless -d $importfn;
1063         unless (opendir(DIR, $importfn)) {
1064                 dbg('msg', "can\'t open $importfn $!");
1065                 Log('msg', "can\'t open $importfn $!");
1066                 return;
1067         } 
1068
1069         my @names = readdir(DIR);
1070         closedir(DIR);
1071         my $name;
1072         foreach $name (@names) {
1073                 next if $name =~ /^\./;
1074                 my $splitit = $name =~ /^split/;
1075                 my $fn = "$importfn/$name";
1076                 next unless -f $fn;
1077                 unless (open(MSG, $fn)) {
1078                         dbg('msg', "can\'t open import file $fn $!");
1079                         Log('msg', "can\'t open import file $fn $!");
1080                         unlink($fn);
1081                         next;
1082                 }
1083                 my @msg = map { chomp; $_ } <MSG>;
1084                 close(MSG);
1085                 unlink($fn);
1086                 my @out = import_one($DXProt::me, \@msg, $splitit);
1087                 Log('msg', @out);
1088         }
1089 }
1090
1091 # import one message as a list in bbs (as extended) mode
1092 # takes a reference to an array containing the whole message
1093 sub import_one
1094 {
1095         my $dxchan = shift;
1096         my $ref = shift;
1097         my $splitit = shift;
1098         my $private = '1';
1099         my $rr = '0';
1100         my $notincalls = 1;
1101         my $from = $dxchan->call;
1102         my $origin = $main::mycall;
1103         my @to;
1104         my @out;
1105                                 
1106         # first line;
1107         my $line = shift @$ref;
1108         my @f = split /\s+/, $line;
1109         unless (@f && $f[0] =~ /^(:?S|SP|SB|SEND)$/ ) {
1110                 my $m = "invalid first line in import '$line'";
1111                 dbg('MSG', $m );
1112                 return (1, $m);
1113         }
1114         while (@f) {
1115                 my $f = uc shift @f;
1116                 next if $f eq 'SEND';
1117
1118                 # private / noprivate / rr
1119                 if ($notincalls && ($f eq 'B' || $f eq 'SB' || $f =~ /^NOP/oi)) {
1120                         $private = '0';
1121                 } elsif ($notincalls && ($f eq 'P' || $f eq 'SP' || $f =~ /^PRI/oi)) {
1122                         ;
1123                 } elsif ($notincalls && ($f eq 'RR')) {
1124                         $rr = '1';
1125                 } elsif ($f eq '@' && @f) {       # this is bbs syntax, for origin
1126                         $origin = uc shift @f;
1127                 } elsif ($f eq '<' && @f) {     # this is bbs syntax  for from call
1128                         $from = uc shift @f;
1129                 } elsif ($f =~ /^\$/) {     # this is bbs syntax  for a bid
1130                         next;
1131                 } elsif ($f =~ /^<\S+/) {     # this is bbs syntax  for from call
1132                         ($from) = $f =~ /^<(\S+)$/;
1133                 } elsif ($f =~ /^\@\S+/) {     # this is bbs syntax for origin
1134                         ($origin) = $f =~ /^\@(\S+)$/;
1135                 } else {
1136
1137                         # callsign ?
1138                         $notincalls = 0;
1139
1140                         # is this callsign a distro?
1141                         my $fn = "$msgdir/distro/$f.pl";
1142                         if (-e $fn) {
1143                                 my $fh = new IO::File $fn;
1144                                 if ($fh) {
1145                                         local $/ = undef;
1146                                         my $s = <$fh>;
1147                                         $fh->close;
1148                                         my @call;
1149                                         @call = eval $s;
1150                                         return (1, "Error in Distro $f.pl:", $@) if $@;
1151                                         if (@call > 0) {
1152                                                 push @f, @call;
1153                                                 next;
1154                                         }
1155                                 }
1156                         }
1157                         
1158                         if (grep $_ eq $f, @DXMsg::badmsg) {
1159                                 push @out, $dxchan->msg('m3', $f);
1160                         } else {
1161                                 push @to, $f;
1162                         }
1163                 }
1164         }
1165         
1166         # subject is the next line
1167         my $subject = shift @$ref;
1168         
1169         # strip off trailing lines 
1170         pop @$ref while (@$ref && $$ref[-1] =~ /^\s*$/);
1171         
1172         # strip off /EX or /ABORT
1173         return ("aborted") if @$ref && $$ref[-1] =~ m{^/ABORT$}i; 
1174         pop @$ref if (@$ref && $$ref[-1] =~ m{^/EX$}i);                                                                  
1175
1176         # sort out any splitting that needs to be done
1177         my @chunk;
1178         if ($splitit) {
1179                 my $lth = 0;
1180                 my $lines = [];
1181                 for (@$ref) {
1182                         if ($lth >= $maxchunk || ($lth > $minchunk && /^\s*$/)) {
1183                                 push @chunk, $lines;
1184                                 $lines = [];
1185                                 $lth = 0;
1186                         } 
1187                         push @$lines, $_;
1188                         $lth += length; 
1189                 }
1190                 push @chunk, $lines if @$lines;
1191         } else {
1192                 push @chunk, $ref;
1193         }
1194                                   
1195     # write all the messages away
1196         my $i;
1197         for ( $i = 0;  $i < @chunk; $i++) {
1198                 my $chunk = $chunk[$i];
1199                 my $ch_subject;
1200                 if (@chunk > 1) {
1201                         my $num = " [" . ($i+1) . "/" . scalar @chunk . "]";
1202                         $ch_subject = substr($subject, 0, 27 - length $num) .  $num;
1203                 } else {
1204                         $ch_subject = $subject;
1205                 }
1206                 my $to;
1207                 foreach $to (@to) {
1208                         my $systime = $main::systime;
1209                         my $mycall = $main::mycall;
1210                         my $mref = DXMsg->alloc(DXMsg::next_transno('Msgno'),
1211                                                                         $to,
1212                                                                         $from, 
1213                                                                         $systime,
1214                                                                         $private, 
1215                                                                         $ch_subject, 
1216                                                                         $origin,
1217                                                                         '0',
1218                                                                         $rr);
1219                         $mref->swop_it($main::mycall);
1220                         $mref->store($chunk);
1221                         $mref->add_dir();
1222                         push @out, $dxchan->msg('m11', $mref->{msgno}, $to);
1223                         #push @out, "msgno $ref->{msgno} sent to $to";
1224                         my $todxchan = DXChannel->get(uc $to);
1225                         if ($todxchan) {
1226                                 if ($todxchan->is_user()) {
1227                                         $todxchan->send($todxchan->msg('m9'));
1228                                 }
1229                         }
1230                 }
1231         }
1232         return @out;
1233 }
1234
1235 no strict;
1236 sub AUTOLOAD
1237 {
1238         my $self = shift;
1239         my $name = $AUTOLOAD;
1240         return if $name =~ /::DESTROY$/;
1241         $name =~ s/.*:://o;
1242         
1243         confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
1244         # this clever line of code creates a subroutine which takes over from autoload
1245         # from OO Perl - Conway
1246         *{$AUTOLOAD} = sub {@_ > 1 ? $_[0]->{$name} = $_[1] : $_[0]->{$name}} ;
1247         @_ ? $self->{$name} = shift : $self->{$name} ;
1248 }
1249
1250 1;
1251
1252 __END__