added Ids and changed the name of DXConnect to DXChannel
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 require Exporter;
14 @ISA = qw(Exporter);
15
16 use strict;
17 use IO::Select;
18 use IO::Socket;
19 use Carp;
20
21 use vars qw (%rd_callbacks %wt_callbacks $rd_handles $wt_handles);
22
23 %rd_callbacks = ();
24 %wt_callbacks = ();
25 $rd_handles   = IO::Select->new();
26 $wt_handles   = IO::Select->new();
27 my $blocking_supported = 0;
28
29 BEGIN {
30     # Checks if blocking is supported
31     eval {
32         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
33     };
34     $blocking_supported = 1 unless $@;
35 }
36
37 #-----------------------------------------------------------------
38 # Send side routines
39 sub connect {
40     my ($pkg, $to_host, $to_port,$rcvd_notification_proc) = @_;
41     
42     # Create a new internet socket
43     
44     my $sock = IO::Socket::INET->new (
45                                       PeerAddr => $to_host,
46                                       PeerPort => $to_port,
47                                       Proto    => 'tcp',
48                                       Reuse    => 1);
49
50     return undef unless $sock;
51
52     # Create a connection end-point object
53     my $conn = {
54         sock                   => $sock,
55         rcvd_notification_proc => $rcvd_notification_proc,
56     };
57     
58     if ($rcvd_notification_proc) {
59         my $callback = sub {_rcv($conn, 0)};
60         set_event_handler ($sock, "read" => $callback);
61     }
62     return bless $conn, $pkg;
63 }
64
65 sub disconnect {
66     my $conn = shift;
67     my $sock = delete $conn->{sock};
68     return unless defined($sock);
69     set_event_handler ($sock, "read" => undef, "write" => undef);
70     close($sock);
71 }
72
73 sub send_now {
74     my ($conn, $msg) = @_;
75     _enqueue ($conn, $msg);
76     $conn->_send (1); # 1 ==> flush
77 }
78
79 sub send_later {
80     my ($conn, $msg) = @_;
81     _enqueue($conn, $msg);
82     my $sock = $conn->{sock};
83     return unless defined($sock);
84     set_event_handler ($sock, "write" => sub {$conn->_send(0)});
85 }
86
87 sub _enqueue {
88     my ($conn, $msg) = @_;
89     # prepend length (encoded as network long)
90     my $len = length($msg);
91     $msg = pack ('N', $len) . $msg; 
92     push (@{$conn->{queue}}, $msg);
93 }
94
95 sub _send {
96     my ($conn, $flush) = @_;
97     my $sock = $conn->{sock};
98     return unless defined($sock);
99     my ($rq) = $conn->{queue};
100
101     # If $flush is set, set the socket to blocking, and send all
102     # messages in the queue - return only if there's an error
103     # If $flush is 0 (deferred mode) make the socket non-blocking, and
104     # return to the event loop only after every message, or if it
105     # is likely to block in the middle of a message.
106
107     $flush ? $conn->set_blocking() : $conn->set_non_blocking();
108     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
109
110     while (@$rq) {
111         my $msg            = $rq->[0];
112         my $bytes_to_write = length($msg) - $offset;
113         my $bytes_written  = 0;
114         while ($bytes_to_write) {
115             $bytes_written = syswrite ($sock, $msg,
116                                        $bytes_to_write, $offset);
117             if (!defined($bytes_written)) {
118                 if (_err_will_block($!)) {
119                     # Should happen only in deferred mode. Record how
120                     # much we have already sent.
121                     $conn->{send_offset} = $offset;
122                     # Event handler should already be set, so we will
123                     # be called back eventually, and will resume sending
124                     return 1;
125                 } else {    # Uh, oh
126                     $conn->handle_send_err($!);
127                     return 0; # fail. Message remains in queue ..
128                 }
129             }
130             $offset         += $bytes_written;
131             $bytes_to_write -= $bytes_written;
132         }
133         delete $conn->{send_offset};
134         $offset = 0;
135         shift @$rq;
136         last unless $flush; # Go back to select and wait
137                             # for it to fire again.
138     }
139     # Call me back if queue has not been drained.
140     if (@$rq) {
141         set_event_handler ($sock, "write" => sub {$conn->_send(0)});
142     } else {
143         set_event_handler ($sock, "write" => undef);
144     }
145     1;  # Success
146 }
147
148 sub _err_will_block {
149     if ($blocking_supported) {
150         return ($_[0] == EAGAIN());
151     }
152     return 0;
153 }
154 sub set_non_blocking {                        # $conn->set_blocking
155     if ($blocking_supported) {
156         # preserve other fcntl flags
157         my $flags = fcntl ($_[0], F_GETFL(), 0);
158         fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK());
159     }
160 }
161 sub set_blocking {
162     if ($blocking_supported) {
163         my $flags = fcntl ($_[0], F_GETFL(), 0);
164         $flags  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
165         fcntl ($_[0], F_SETFL(), $flags);
166     }
167 }
168 sub handle_send_err {
169    # For more meaningful handling of send errors, subclass Msg and
170    # rebless $conn.  
171    my ($conn, $err_msg) = @_;
172    warn "Error while sending: $err_msg \n";
173    set_event_handler ($conn->{sock}, "write" => undef);
174 }
175
176 #-----------------------------------------------------------------
177 # Receive side routines
178
179 my ($g_login_proc,$g_pkg);
180 my $main_socket = 0;
181 sub new_server {
182     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc)\n";
183     my ($pkg, $my_host, $my_port, $login_proc) = @_;
184     
185     $main_socket = IO::Socket::INET->new (
186                                           LocalAddr => $my_host,
187                                           LocalPort => $my_port,
188                                           Listen    => 5,
189                                           Proto     => 'tcp',
190                                           Reuse     => 1);
191     die "Could not create socket: $! \n" unless $main_socket;
192     set_event_handler ($main_socket, "read" => \&_new_client);
193     $g_login_proc = $login_proc; $g_pkg = $pkg;
194 }
195
196 sub rcv_now {
197     my ($conn) = @_;
198     my ($msg, $err) = _rcv ($conn, 1); # 1 ==> rcv now
199     return wantarray ? ($msg, $err) : $msg;
200 }
201
202 sub _rcv {                     # Complement to _send
203     my ($conn, $rcv_now) = @_; # $rcv_now complement of $flush
204     # Find out how much has already been received, if at all
205     my ($msg, $offset, $bytes_to_read, $bytes_read);
206     my $sock = $conn->{sock};
207     return unless defined($sock);
208     if (exists $conn->{msg}) {
209         $msg           = $conn->{msg};
210         $offset        = length($msg) - 1;  # sysread appends to it.
211         $bytes_to_read = $conn->{bytes_to_read};
212         delete $conn->{'msg'};              # have made a copy
213     } else {
214         # The typical case ...
215         $msg           = "";                # Otherwise -w complains 
216         $offset        = 0 ;  
217         $bytes_to_read = 0 ;                # Will get set soon
218     }
219     # We want to read the message length in blocking mode. Quite
220     # unlikely that we'll get blocked too long reading 4 bytes
221     if (!$bytes_to_read)  {                 # Get new length 
222         my $buf;
223         $conn->set_blocking();
224         $bytes_read = sysread($sock, $buf, 4);
225         if ($! || ($bytes_read != 4)) {
226             goto FINISH;
227         }
228         $bytes_to_read = unpack ('N', $buf);
229     }
230     $conn->set_non_blocking() unless $rcv_now;
231     while ($bytes_to_read) {
232         $bytes_read = sysread ($sock, $msg, $bytes_to_read, $offset);
233         if (defined ($bytes_read)) {
234             if ($bytes_read == 0) {
235                 last;
236             }
237             $bytes_to_read -= $bytes_read;
238             $offset        += $bytes_read;
239         } else {
240             if (_err_will_block($!)) {
241                 # Should come here only in non-blocking mode
242                 $conn->{msg}           = $msg;
243                 $conn->{bytes_to_read} = $bytes_to_read;
244                 return ;   # .. _rcv will be called later
245                            # when socket is readable again
246             } else {
247                 last;
248             }
249         }
250     }
251
252   FINISH:
253     if (length($msg) == 0) {
254         $conn->disconnect();
255     }
256     if ($rcv_now) {
257         return ($msg, $!);
258     } else {
259         &{$conn->{rcvd_notification_proc}}($conn, $msg, $!);
260     }
261 }
262
263 sub _new_client {
264     my $sock = $main_socket->accept();
265     my $conn = bless {
266         'sock' =>  $sock,
267         'state' => 'connected'
268     }, $g_pkg;
269     my $rcvd_notification_proc =
270         &$g_login_proc ($conn, $sock->peerhost(), $sock->peerport());
271     if ($rcvd_notification_proc) {
272         $conn->{rcvd_notification_proc} = $rcvd_notification_proc;
273         my $callback = sub {_rcv($conn,0)};
274         set_event_handler ($sock, "read" => $callback);
275     } else {  # Login failed
276         $conn->disconnect();
277     }
278 }
279
280 #----------------------------------------------------
281 # Event loop routines used by both client and server
282
283 sub set_event_handler {
284     shift unless ref($_[0]); # shift if first arg is package name
285     my ($handle, %args) = @_;
286     my $callback;
287     if (exists $args{'write'}) {
288         $callback = $args{'write'};
289         if ($callback) {
290             $wt_callbacks{$handle} = $callback;
291             $wt_handles->add($handle);
292         } else {
293             delete $wt_callbacks{$handle};
294             $wt_handles->remove($handle);
295         }
296     }
297     if (exists $args{'read'}) {
298         $callback = $args{'read'};
299         if ($callback) {
300             $rd_callbacks{$handle} = $callback;
301             $rd_handles->add($handle);
302         } else {
303             delete $rd_callbacks{$handle};
304             $rd_handles->remove($handle);
305        }
306     }
307 }
308
309 sub event_loop {
310     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
311     my ($conn, $r, $w, $rset, $wset);
312     while (1) {
313         # Quit the loop if no handles left to process
314         last unless ($rd_handles->count() || $wt_handles->count());
315         ($rset, $wset) =
316             IO::Select->select ($rd_handles, $wt_handles, undef, $timeout);
317         foreach $r (@$rset) {
318             &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r};
319         }
320         foreach $w (@$wset) {
321             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
322         }
323         if (defined($loop_count)) {
324             last unless --$loop_count;
325         }
326     }
327 }
328
329 1;
330
331 __END__
332