Initial version
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 package Msg;
9
10 require Exporter;
11 @ISA = qw(Exporter);
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 use Carp;
17
18 use vars qw (%rd_callbacks %wt_callbacks $rd_handles $wt_handles);
19
20 %rd_callbacks = ();
21 %wt_callbacks = ();
22 $rd_handles   = IO::Select->new();
23 $wt_handles   = IO::Select->new();
24 my $blocking_supported = 0;
25
26 BEGIN {
27     # Checks if blocking is supported
28     eval {
29         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
30     };
31     $blocking_supported = 1 unless $@;
32 }
33
34 #-----------------------------------------------------------------
35 # Send side routines
36 sub connect {
37     my ($pkg, $to_host, $to_port,$rcvd_notification_proc) = @_;
38     
39     # Create a new internet socket
40     
41     my $sock = IO::Socket::INET->new (
42                                       PeerAddr => $to_host,
43                                       PeerPort => $to_port,
44                                       Proto    => 'tcp',
45                                       Reuse    => 1);
46
47     return undef unless $sock;
48
49     # Create a connection end-point object
50     my $conn = {
51         sock                   => $sock,
52         rcvd_notification_proc => $rcvd_notification_proc,
53     };
54     
55     if ($rcvd_notification_proc) {
56         my $callback = sub {_rcv($conn, 0)};
57         set_event_handler ($sock, "read" => $callback);
58     }
59     return bless $conn, $pkg;
60 }
61
62 sub disconnect {
63     my $conn = shift;
64     my $sock = delete $conn->{sock};
65     return unless defined($sock);
66     set_event_handler ($sock, "read" => undef, "write" => undef);
67     close($sock);
68 }
69
70 sub send_now {
71     my ($conn, $msg) = @_;
72     _enqueue ($conn, $msg);
73     $conn->_send (1); # 1 ==> flush
74 }
75
76 sub send_later {
77     my ($conn, $msg) = @_;
78     _enqueue($conn, $msg);
79     my $sock = $conn->{sock};
80     return unless defined($sock);
81     set_event_handler ($sock, "write" => sub {$conn->_send(0)});
82 }
83
84 sub _enqueue {
85     my ($conn, $msg) = @_;
86     # prepend length (encoded as network long)
87     my $len = length($msg);
88     $msg = pack ('N', $len) . $msg; 
89     push (@{$conn->{queue}}, $msg);
90 }
91
92 sub _send {
93     my ($conn, $flush) = @_;
94     my $sock = $conn->{sock};
95     return unless defined($sock);
96     my ($rq) = $conn->{queue};
97
98     # If $flush is set, set the socket to blocking, and send all
99     # messages in the queue - return only if there's an error
100     # If $flush is 0 (deferred mode) make the socket non-blocking, and
101     # return to the event loop only after every message, or if it
102     # is likely to block in the middle of a message.
103
104     $flush ? $conn->set_blocking() : $conn->set_non_blocking();
105     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
106
107     while (@$rq) {
108         my $msg            = $rq->[0];
109         my $bytes_to_write = length($msg) - $offset;
110         my $bytes_written  = 0;
111         while ($bytes_to_write) {
112             $bytes_written = syswrite ($sock, $msg,
113                                        $bytes_to_write, $offset);
114             if (!defined($bytes_written)) {
115                 if (_err_will_block($!)) {
116                     # Should happen only in deferred mode. Record how
117                     # much we have already sent.
118                     $conn->{send_offset} = $offset;
119                     # Event handler should already be set, so we will
120                     # be called back eventually, and will resume sending
121                     return 1;
122                 } else {    # Uh, oh
123                     $conn->handle_send_err($!);
124                     return 0; # fail. Message remains in queue ..
125                 }
126             }
127             $offset         += $bytes_written;
128             $bytes_to_write -= $bytes_written;
129         }
130         delete $conn->{send_offset};
131         $offset = 0;
132         shift @$rq;
133         last unless $flush; # Go back to select and wait
134                             # for it to fire again.
135     }
136     # Call me back if queue has not been drained.
137     if (@$rq) {
138         set_event_handler ($sock, "write" => sub {$conn->_send(0)});
139     } else {
140         set_event_handler ($sock, "write" => undef);
141     }
142     1;  # Success
143 }
144
145 sub _err_will_block {
146     if ($blocking_supported) {
147         return ($_[0] == EAGAIN());
148     }
149     return 0;
150 }
151 sub set_non_blocking {                        # $conn->set_blocking
152     if ($blocking_supported) {
153         # preserve other fcntl flags
154         my $flags = fcntl ($_[0], F_GETFL(), 0);
155         fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK());
156     }
157 }
158 sub set_blocking {
159     if ($blocking_supported) {
160         my $flags = fcntl ($_[0], F_GETFL(), 0);
161         $flags  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
162         fcntl ($_[0], F_SETFL(), $flags);
163     }
164 }
165 sub handle_send_err {
166    # For more meaningful handling of send errors, subclass Msg and
167    # rebless $conn.  
168    my ($conn, $err_msg) = @_;
169    warn "Error while sending: $err_msg \n";
170    set_event_handler ($conn->{sock}, "write" => undef);
171 }
172
173 #-----------------------------------------------------------------
174 # Receive side routines
175
176 my ($g_login_proc,$g_pkg);
177 my $main_socket = 0;
178 sub new_server {
179     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc)\n";
180     my ($pkg, $my_host, $my_port, $login_proc) = @_;
181     
182     $main_socket = IO::Socket::INET->new (
183                                           LocalAddr => $my_host,
184                                           LocalPort => $my_port,
185                                           Listen    => 5,
186                                           Proto     => 'tcp',
187                                           Reuse     => 1);
188     die "Could not create socket: $! \n" unless $main_socket;
189     set_event_handler ($main_socket, "read" => \&_new_client);
190     $g_login_proc = $login_proc; $g_pkg = $pkg;
191 }
192
193 sub rcv_now {
194     my ($conn) = @_;
195     my ($msg, $err) = _rcv ($conn, 1); # 1 ==> rcv now
196     return wantarray ? ($msg, $err) : $msg;
197 }
198
199 sub _rcv {                     # Complement to _send
200     my ($conn, $rcv_now) = @_; # $rcv_now complement of $flush
201     # Find out how much has already been received, if at all
202     my ($msg, $offset, $bytes_to_read, $bytes_read);
203     my $sock = $conn->{sock};
204     return unless defined($sock);
205     if (exists $conn->{msg}) {
206         $msg           = $conn->{msg};
207         $offset        = length($msg) - 1;  # sysread appends to it.
208         $bytes_to_read = $conn->{bytes_to_read};
209         delete $conn->{'msg'};              # have made a copy
210     } else {
211         # The typical case ...
212         $msg           = "";                # Otherwise -w complains 
213         $offset        = 0 ;  
214         $bytes_to_read = 0 ;                # Will get set soon
215     }
216     # We want to read the message length in blocking mode. Quite
217     # unlikely that we'll get blocked too long reading 4 bytes
218     if (!$bytes_to_read)  {                 # Get new length 
219         my $buf;
220         $conn->set_blocking();
221         $bytes_read = sysread($sock, $buf, 4);
222         if ($! || ($bytes_read != 4)) {
223             goto FINISH;
224         }
225         $bytes_to_read = unpack ('N', $buf);
226     }
227     $conn->set_non_blocking() unless $rcv_now;
228     while ($bytes_to_read) {
229         $bytes_read = sysread ($sock, $msg, $bytes_to_read, $offset);
230         if (defined ($bytes_read)) {
231             if ($bytes_read == 0) {
232                 last;
233             }
234             $bytes_to_read -= $bytes_read;
235             $offset        += $bytes_read;
236         } else {
237             if (_err_will_block($!)) {
238                 # Should come here only in non-blocking mode
239                 $conn->{msg}           = $msg;
240                 $conn->{bytes_to_read} = $bytes_to_read;
241                 return ;   # .. _rcv will be called later
242                            # when socket is readable again
243             } else {
244                 last;
245             }
246         }
247     }
248
249   FINISH:
250     if (length($msg) == 0) {
251         $conn->disconnect();
252     }
253     if ($rcv_now) {
254         return ($msg, $!);
255     } else {
256         &{$conn->{rcvd_notification_proc}}($conn, $msg, $!);
257     }
258 }
259
260 sub _new_client {
261     my $sock = $main_socket->accept();
262     my $conn = bless {
263         'sock' =>  $sock,
264         'state' => 'connected'
265     }, $g_pkg;
266     my $rcvd_notification_proc =
267         &$g_login_proc ($conn, $sock->peerhost(), $sock->peerport());
268     if ($rcvd_notification_proc) {
269         $conn->{rcvd_notification_proc} = $rcvd_notification_proc;
270         my $callback = sub {_rcv($conn,0)};
271         set_event_handler ($sock, "read" => $callback);
272     } else {  # Login failed
273         $conn->disconnect();
274     }
275 }
276
277 #----------------------------------------------------
278 # Event loop routines used by both client and server
279
280 sub set_event_handler {
281     shift unless ref($_[0]); # shift if first arg is package name
282     my ($handle, %args) = @_;
283     my $callback;
284     if (exists $args{'write'}) {
285         $callback = $args{'write'};
286         if ($callback) {
287             $wt_callbacks{$handle} = $callback;
288             $wt_handles->add($handle);
289         } else {
290             delete $wt_callbacks{$handle};
291             $wt_handles->remove($handle);
292         }
293     }
294     if (exists $args{'read'}) {
295         $callback = $args{'read'};
296         if ($callback) {
297             $rd_callbacks{$handle} = $callback;
298             $rd_handles->add($handle);
299         } else {
300             delete $rd_callbacks{$handle};
301             $rd_handles->remove($handle);
302        }
303     }
304 }
305
306 sub event_loop {
307     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
308     my ($conn, $r, $w, $rset, $wset);
309     while (1) {
310         # Quit the loop if no handles left to process
311         last unless ($rd_handles->count() || $wt_handles->count());
312         ($rset, $wset) =
313             IO::Select->select ($rd_handles, $wt_handles, undef, $timeout);
314         foreach $r (@$rset) {
315             &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r};
316         }
317         foreach $w (@$wset) {
318             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
319         }
320         if (defined($loop_count)) {
321             last unless --$loop_count;
322         }
323     }
324 }
325
326 1;
327
328 __END__
329