6805e008a770368b33e801c127fc721fc51187bf
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 #
9 #
10
11 package Msg;
12
13 use strict;
14
15 use DXUtil;
16
17 use IO::Select;
18 use DXDebug;
19 use Timer;
20
21 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum $total_in $total_out $io_socket);
22
23 %rd_callbacks = ();
24 %wt_callbacks = ();
25 %er_callbacks = ();
26 $rd_handles   = IO::Select->new();
27 $wt_handles   = IO::Select->new();
28 $er_handles   = IO::Select->new();
29 $total_in = $total_out = 0;
30
31 $now = time;
32
33 BEGIN {
34     # Checks if blocking is supported
35     eval {
36                 local $^W;
37         require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL))
38     };
39
40         eval {
41                 local $^W;
42                 require IO::Socket::INET6;
43         };
44
45         if ($@) {
46                 dbg($@);
47                 require IO::Socket;
48                 $io_socket = 'IO::Socket::INET';
49         } else {
50                 $io_socket = 'IO::Socket::INET6';
51         }
52         $io_socket->import;
53
54         if ($@ || $main::is_win) {
55                 $blocking_supported = $io_socket->can('blocking') ? 2 : 0;
56         } else {
57                 $blocking_supported = $io_socket->can('blocking') ? 2 : 1;
58         }
59
60
61         # import as many of these errno values as are available
62         eval {
63                 local $^W;
64                 require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
65         };
66
67         unless ($^O eq 'MSWin32') {
68                 if ($] >= 5.6) {
69                         eval {
70                                 local $^W;
71                                 require Socket; Socket->import(qw(IPPROTO_TCP TCP_NODELAY));
72                         };
73                 } else {
74                         dbg("IPPROTO_TCP and TCP_NODELAY manually defined");
75                         eval 'sub IPPROTO_TCP {     6 };';
76                         eval 'sub TCP_NODELAY {     1 };';
77                 }
78         }
79         # http://support.microsoft.com/support/kb/articles/Q150/5/37.asp
80         # defines EINPROGRESS as 10035.  We provide it here because some
81         # Win32 users report POSIX::EINPROGRESS is not vendor-supported.
82         if ($^O eq 'MSWin32') { 
83                 eval '*EINPROGRESS = sub { 10036 };' unless defined *EINPROGRESS;
84                 eval '*EWOULDBLOCK = *EAGAIN = sub { 10035 };' unless defined *EWOULDBLOCK;
85                 eval '*F_GETFL     = sub {     0 };' unless defined *F_GETFL;
86                 eval '*F_SETFL     = sub {     0 };' unless defined *F_SETFL;
87                 eval 'sub IPPROTO_TCP  {     6 };';
88                 eval 'sub TCP_NODELAY  {     1 };';
89                 $blocking_supported = 0;   # it appears that this DOESN'T work :-(
90         } 
91 }
92
93 my $w = $^W;
94 $^W = 0;
95 my $eagain = eval {EAGAIN()};
96 my $einprogress = eval {EINPROGRESS()};
97 my $ewouldblock = eval {EWOULDBLOCK()};
98 $^W = $w;
99 $cnum = 0;
100
101
102 #
103 #-----------------------------------------------------------------
104 # Generalised initializer
105
106 sub new
107 {
108     my ($pkg, $rproc) = @_;
109         my $obj = ref($pkg);
110         my $class = $obj || $pkg;
111
112     my $conn = {
113         rproc => $rproc,
114                 inqueue => [],
115                 outqueue => [],
116                 state => 0,
117                 lineend => "\r\n",
118                 csort => 'telnet',
119                 timeval => 60,
120                 blocking => 0,
121                 cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)),
122     };
123
124         $noconns++;
125         
126         dbg("Connection created ($noconns)") if isdbg('connll');
127         return bless $conn, $class;
128 }
129
130 sub set_error
131 {
132         my $conn = shift;
133         my $callback = shift;
134         $conn->{eproc} = $callback;
135         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
136 }
137
138 sub set_rproc
139 {
140         my $conn = shift;
141         my $callback = shift;
142         $conn->{rproc} = $callback;
143 }
144
145 sub blocking
146 {
147         return unless $blocking_supported;
148
149         # Make the handle stop blocking, the Windows way.
150         if ($blocking_supported) { 
151                 $_[0]->blocking($_[1]);
152         } else {
153                 my $flags = fcntl ($_[0], F_GETFL, 0);
154                 if ($_[1]) {
155                         $flags &= ~O_NONBLOCK;
156                 } else {
157                         $flags |= O_NONBLOCK;
158                 }
159                 fcntl ($_[0], F_SETFL, $flags);
160         }
161 }
162
163 # save it
164 sub conns
165 {
166         my $pkg = shift;
167         my $call = shift;
168         my $ref;
169         
170         if (ref $pkg) {
171                 $call = $pkg->{call} unless $call;
172                 return undef unless $call;
173                 dbg("changing $pkg->{call} to $call") if isdbg('connll') && exists $pkg->{call} && $call ne $pkg->{call};
174                 delete $conns{$pkg->{call}} if exists $pkg->{call} && exists $conns{$pkg->{call}} && $pkg->{call} ne $call; 
175                 $pkg->{call} = $call;
176                 $ref = $conns{$call} = $pkg;
177                 dbg("Connection $pkg->{cnum} $call stored") if isdbg('connll');
178         } else {
179                 $ref = $conns{$call};
180         }
181         return $ref;
182 }
183
184 # this is only called by any dependent processes going away unexpectedly
185 sub pid_gone
186 {
187         my ($pkg, $pid) = @_;
188         
189         my @pid = grep {$_->{pid} == $pid} values %conns;
190         foreach my $p (@pid) {
191                 &{$p->{eproc}}($p, "$pid has gorn") if exists $p->{eproc};
192                 $p->disconnect;
193         }
194 }
195
196 sub ax25
197 {
198         my $conn = shift;
199         return $conn->{csort} eq 'ax25';
200 }
201
202 sub peerhost
203 {
204         my $conn = shift;
205         $conn->{peerhost} ||= 'ax25' if $conn->ax25;
206         $conn->{peerhost} ||= $conn->{sock}->peerhost if $conn->{sock} && $conn->{sock}->isa('IO::Socket::INET');
207         $conn->{peerhost} ||= 'UNKNOWN';
208         return $conn->{peerhost};
209 }
210
211 #-----------------------------------------------------------------
212 # Send side routines
213 sub connect {
214     my ($pkg, $to_host, $to_port, $rproc) = @_;
215
216     # Create a connection end-point object
217     my $conn = $pkg;
218         unless (ref $pkg) {
219                 $conn = $pkg->new($rproc);
220         }
221         $conn->{peerhost} = $to_host;
222         $conn->{peerport} = $to_port;
223         $conn->{sort} = 'Outgoing';
224         
225         my $sock;
226         if ($blocking_supported) {
227                 $sock = $io_socket->new(PeerAddr => $to_host, PeerPort => $to_port, Proto => 'tcp', Blocking =>0);
228         } else {
229                 # Create a new internet socket
230                 my $sock = $io_socket->new();
231                 return undef unless $sock;
232
233                 my $proto = getprotobyname('tcp');
234                 $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
235
236                 blocking($sock, 0);
237                 $conn->{blocking} = 0;
238
239                 # does the host resolve?
240                 my $ip = gethostbyname($to_host);
241                 return undef unless $ip;
242
243                 my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
244                 return undef unless $r || _err_will_block($!);
245         }
246         
247         $conn->{sock} = $sock;
248         $conn->{peerhost} = $sock->peerhost;    # for consistency
249
250     if ($conn->{rproc}) {
251         my $callback = sub {$conn->_rcv};
252         set_event_handler ($sock, read => $callback);
253     }
254     return $conn;
255 }
256
257 sub start_program
258 {
259         my ($conn, $line, $sort) = @_;
260         my $pid;
261         
262         local $^F = 10000;              # make sure it ain't closed on exec
263         my ($a, $b) = $io_socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
264         if ($a && $b) {
265                 $a->autoflush(1);
266                 $b->autoflush(1);
267                 $pid = fork;
268                 if (defined $pid) {
269                         if ($pid) {
270                                 close $b;
271                                 $conn->{sock} = $a;
272                                 $conn->{csort} = $sort;
273                                 $conn->{lineend} = "\cM" if $sort eq 'ax25';
274                                 $conn->{pid} = $pid;
275                                 if ($conn->{rproc}) {
276                                         my $callback = sub {$conn->_rcv};
277                                         Msg::set_event_handler ($a, read => $callback);
278                                 }
279                                 dbg("connect $conn->{cnum}: started pid: $conn->{pid} as $line") if isdbg('connect');
280                         } else {
281                                 $^W = 0;
282                                 dbgclose();
283                                 STDIN->close;
284                                 STDOUT->close;
285                                 STDOUT->close;
286                                 *STDIN = IO::File->new_from_fd($b, 'r') or die;
287                                 *STDOUT = IO::File->new_from_fd($b, 'w') or die;
288                                 *STDERR = IO::File->new_from_fd($b, 'w') or die;
289                                 close $a;
290                                 unless ($main::is_win) {
291                                         #                                               $SIG{HUP} = 'IGNORE';
292                                         $SIG{HUP} = $SIG{CHLD} = $SIG{TERM} = $SIG{INT} = 'DEFAULT';
293                                         alarm(0);
294                                 }
295                                 exec "$line" or dbg("exec '$line' failed $!");
296                         } 
297                 } else {
298                         dbg("cannot fork for $line");
299                 }
300         } else {
301                 dbg("no socket pair $! for $line");
302         }
303         return $pid;
304 }
305
306 sub disconnect 
307 {
308     my $conn = shift;
309         return if exists $conn->{disconnecting};
310
311         $conn->{disconnecting} = 1;
312     my $sock = delete $conn->{sock};
313         $conn->{state} = 'E';
314         $conn->{timeout}->del if $conn->{timeout};
315
316         # be careful to delete the correct one
317         my $call;
318         if ($call = $conn->{call}) {
319                 my $ref = $conns{$call};
320                 delete $conns{$call} if $ref && $ref == $conn;
321         }
322         $call ||= 'unallocated';
323         dbg("Connection $conn->{cnum} $call disconnected") if isdbg('connll');
324         
325         # get rid of any references
326         for (keys %$conn) {
327                 if (ref($conn->{$_})) {
328                         delete $conn->{$_};
329                 }
330         }
331
332         if (defined($sock)) {
333                 set_event_handler ($sock, read => undef, write => undef, error => undef);
334                 shutdown($sock, 3);
335                 close($sock);
336         }
337         
338         unless ($main::is_win) {
339                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
340         }
341 }
342
343 sub send_now {
344     my ($conn, $msg) = @_;
345     $conn->enqueue($msg);
346     $conn->_send (1); # 1 ==> flush
347 }
348
349 sub send_later {
350     my ($conn, $msg) = @_;
351     $conn->enqueue($msg);
352     my $sock = $conn->{sock};
353     return unless defined($sock);
354     set_event_handler ($sock, write => sub {$conn->_send(0)});
355 }
356
357 sub enqueue {
358     my $conn = shift;
359     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
360 }
361
362 sub _send {
363     my ($conn, $flush) = @_;
364     my $sock = $conn->{sock};
365     return unless defined($sock);
366     my $rq = $conn->{outqueue};
367
368     # If $flush is set, set the socket to blocking, and send all
369     # messages in the queue - return only if there's an error
370     # If $flush is 0 (deferred mode) make the socket non-blocking, and
371     # return to the event loop only after every message, or if it
372     # is likely to block in the middle of a message.
373
374 #       if ($conn->{blocking} != $flush) {
375 #               blocking($sock, $flush);
376 #               $conn->{blocking} = $flush;
377 #       }
378     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
379
380     while (@$rq) {
381         my $msg            = $rq->[0];
382                 my $mlth           = length($msg);
383         my $bytes_to_write = $mlth - $offset;
384         my $bytes_written  = 0;
385                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
386         while ($bytes_to_write > 0) {
387             $bytes_written = syswrite ($sock, $msg,
388                                        $bytes_to_write, $offset);
389             if (!defined($bytes_written)) {
390                 if (_err_will_block($!)) {
391                     # Should happen only in deferred mode. Record how
392                     # much we have already sent.
393                     $conn->{send_offset} = $offset;
394                     # Event handler should already be set, so we will
395                     # be called back eventually, and will resume sending
396                     return 1;
397                 } else {    # Uh, oh
398                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
399                                         $conn->disconnect;
400                     return 0; # fail. Message remains in queue ..
401                 }
402             } elsif (isdbg('raw')) {
403                                 my $call = $conn->{call} || 'none';
404                                 dbgdump('raw', "$call send $bytes_written: ", $msg);
405                         }
406                         $total_out      += $bytes_written;
407             $offset         += $bytes_written;
408             $bytes_to_write -= $bytes_written;
409         }
410         delete $conn->{send_offset};
411         $offset = 0;
412         shift @$rq;
413         #last unless $flush; # Go back to select and wait
414                             # for it to fire again.
415     }
416     # Call me back if queue has not been drained.
417     unless (@$rq) {
418         set_event_handler ($sock, write => undef);
419                 if (exists $conn->{close_on_empty}) {
420                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
421                         $conn->disconnect; 
422                 }
423     }
424     1;  # Success
425 }
426
427 sub dup_sock
428 {
429         my $conn = shift;
430         my $oldsock = $conn->{sock};
431         my $rc = $rd_callbacks{$oldsock};
432         my $wc = $wt_callbacks{$oldsock};
433         my $ec = $er_callbacks{$oldsock};
434         my $sock = $oldsock->new_from_fd($oldsock, "w+");
435         if ($sock) {
436                 set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
437                 $conn->{sock} = $sock;
438                 set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
439                 $oldsock->close;
440         }
441 }
442
443 sub _err_will_block {
444         return 0 unless $blocking_supported;
445         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
446 }
447
448 sub close_on_empty
449 {
450         my $conn = shift;
451         $conn->{close_on_empty} = 1;
452 }
453
454 #-----------------------------------------------------------------
455 # Receive side routines
456
457 sub new_server {
458     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
459     my ($pkg, $my_host, $my_port, $login_proc) = @_;
460         my $self = $pkg->new($login_proc);
461         
462     $self->{sock} = $io_socket->new (
463                                           LocalAddr => "$my_host:$my_port",
464 #                                          LocalPort => $my_port,
465                                           Listen    => SOMAXCONN,
466                                           Proto     => 'tcp',
467                                           Reuse => 1);
468     die "Could not create socket: $! \n" unless $self->{sock};
469     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
470         return $self;
471 }
472
473
474 sub nolinger
475 {
476         my $conn = shift;
477
478         unless ($main::is_win) {
479                 if (isdbg('sock')) {
480                         my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
481                         my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
482                         my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
483                         dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
484                 }
485                 
486                 eval {setsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE, 1)} or dbg("setsockopt keepalive: $!");
487                 eval {setsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER, pack("ll", 0, 0))} or dbg("setsockopt linger: $!");
488                 eval {setsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY, 1)} or eval {setsockopt($conn->{sock}, SOL_SOCKET, TCP_NODELAY, 1)} or dbg("setsockopt tcp_nodelay: $!");
489                 $conn->{sock}->autoflush(0);
490
491                 if (isdbg('sock')) {
492                         my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
493                         my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
494                         my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
495                         dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
496                 }
497         } 
498 }
499
500 sub dequeue
501 {
502         my $conn = shift;
503
504         if ($conn->{msg} =~ /\n/) {
505                 my @lines = split /\r?\n/, $conn->{msg};
506                 if ($conn->{msg} =~ /\n$/) {
507                         delete $conn->{msg};
508                 } else {
509                         $conn->{msg} = pop @lines;
510                 }
511                 for (@lines) {
512                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
513                 }
514         }
515 }
516
517 sub _rcv {                     # Complement to _send
518     my $conn = shift; # $rcv_now complement of $flush
519     # Find out how much has already been received, if at all
520     my ($msg, $offset, $bytes_to_read, $bytes_read);
521     my $sock = $conn->{sock};
522     return unless defined($sock);
523
524         my @lines;
525 #       if ($conn->{blocking}) {
526 #               blocking($sock, 0);
527 #               $conn->{blocking} = 0;
528 #       }
529         $bytes_read = sysread ($sock, $msg, 1024, 0);
530         if (defined ($bytes_read)) {
531                 if ($bytes_read > 0) {
532                         $total_in += $bytes_read;
533                         if (isdbg('raw')) {
534                                 my $call = $conn->{call} || 'none';
535                                 dbgdump('raw', "$call read $bytes_read: ", $msg);
536                         }
537                         if ($conn->{echo}) {
538                                 my @ch = split //, $msg;
539                                 my $out;
540                                 for (@ch) {
541                                         if (/[\cH\x7f]/) {
542                                                 $out .= "\cH \cH";
543                                                 $conn->{msg} =~ s/.$//;
544                                         } else {
545                                                 $out .= $_;
546                                                 $conn->{msg} .= $_;
547                                         }
548                                 }
549                                 if (defined $out) {
550                                         set_event_handler ($sock, write => sub{$conn->_send(0)});
551                                         push @{$conn->{outqueue}}, $out;
552                                 }
553                         } else {
554                                 $conn->{msg} .= $msg;
555                         }
556                 } 
557         } else {
558                 if (_err_will_block($!)) {
559                         return ; 
560                 } else {
561                         $bytes_read = 0;
562                 }
563     }
564
565 FINISH:
566     if (defined $bytes_read && $bytes_read == 0) {
567                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
568                 $conn->disconnect;
569     } else {
570                 unless ($conn->{disable_read}) {
571                         $conn->dequeue if exists $conn->{msg};
572                 }
573         }
574 }
575
576 sub new_client {
577         my $server_conn = shift;
578     my $sock = $server_conn->{sock}->accept();
579         if ($sock) {
580                 my $conn = $server_conn->new($server_conn->{rproc});
581                 $conn->{sock} = $sock;
582                 blocking($sock, 0);
583                 $conn->nolinger;
584                 $conn->{blocking} = 0;
585                 my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
586                 $conn->{sort} = 'Incoming';
587                 if ($eproc) {
588                         $conn->{eproc} = $eproc;
589                         set_event_handler ($sock, error => $eproc);
590                 }
591                 if ($rproc) {
592                         $conn->{rproc} = $rproc;
593                         my $callback = sub {$conn->_rcv};
594                         set_event_handler ($sock, read => $callback);
595                 } else {  # Login failed
596                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
597                         $conn->disconnect();
598                 }
599         } else {
600                 dbg("Msg: error on accept ($!)") if isdbg('err');
601         }
602 }
603
604 sub close_server
605 {
606         my $conn = shift;
607         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
608         $conn->{sock}->close;
609 }
610
611 # close all clients (this is for forking really)
612 sub close_all_clients
613 {
614         foreach my $conn (values %conns) {
615                 $conn->disconnect;
616         }
617 }
618
619 sub disable_read
620 {
621         my $conn = shift;
622         set_event_handler ($conn->{sock}, read => undef);
623         return $_[0] ? $conn->{disable_read} = $_[0] : $_[0];
624 }
625
626 #
627 #----------------------------------------------------
628 # Event loop routines used by both client and server
629
630 sub set_event_handler {
631     shift unless ref($_[0]); # shift if first arg is package name
632     my ($handle, %args) = @_;
633     my $callback;
634     if (exists $args{'write'}) {
635         $callback = $args{'write'};
636         if ($callback) {
637             $wt_callbacks{$handle} = $callback;
638             $wt_handles->add($handle);
639         } else {
640             delete $wt_callbacks{$handle};
641             $wt_handles->remove($handle);
642         }
643     }
644     if (exists $args{'read'}) {
645         $callback = $args{'read'};
646         if ($callback) {
647             $rd_callbacks{$handle} = $callback;
648             $rd_handles->add($handle);
649         } else {
650             delete $rd_callbacks{$handle};
651             $rd_handles->remove($handle);
652        }
653     }
654     if (exists $args{'error'}) {
655         $callback = $args{'error'};
656         if ($callback) {
657             $er_callbacks{$handle} = $callback;
658             $er_handles->add($handle);
659         } else {
660             delete $er_callbacks{$handle};
661             $er_handles->remove($handle);
662        }
663     }
664 }
665
666 sub event_loop {
667     my ($pkg, $loop_count, $timeout, $wronly) = @_; # event_loop(1) to process events once
668     my ($conn, $r, $w, $e, $rset, $wset, $eset);
669     while (1) {
670  
671        # Quit the loop if no handles left to process
672                 if ($wronly) {
673                         last unless $wt_handles->count();
674         
675                         ($rset, $wset, $eset) = IO::Select->select(undef, $wt_handles, undef, $timeout);
676                         
677                         foreach $w (@$wset) {
678                                 &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
679                         }
680                 } else {
681                         
682                         last unless ($rd_handles->count() || $wt_handles->count());
683         
684                         ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
685                         
686                         foreach $e (@$eset) {
687                                 &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
688                         }
689                         foreach $r (@$rset) {
690                                 &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
691                         }
692                         foreach $w (@$wset) {
693                                 &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
694                         }
695                 }
696
697                 Timer::handler;
698                 
699         if (defined($loop_count)) {
700             last unless --$loop_count;
701         }
702     }
703 }
704
705 sub sleep
706 {
707         my ($pkg, $interval) = @_;
708         my $now = time;
709         while (time - $now < $interval) {
710                 $pkg->event_loop(10, 0.01);
711         }
712 }
713
714 sub DESTROY
715 {
716         my $conn = shift;
717         my $call = $conn->{call} || 'unallocated';
718         my $host = $conn->{peerhost} || '';
719         my $port = $conn->{peerport} || '';
720         dbg("Connection $conn->{cnum} $call [$host $port] being destroyed") if isdbg('connll');
721         $noconns--;
722 }
723
724 1;
725
726 __END__
727