Initial version
authordjk <djk>
Mon, 15 Jun 1998 23:20:05 +0000 (23:20 +0000)
committerdjk <djk>
Mon, 15 Jun 1998 23:20:05 +0000 (23:20 +0000)
perl/DXConnect.pm [new file with mode: 0644]
perl/DXUser.pm [new file with mode: 0644]
perl/DXUtil.pm [new file with mode: 0644]
perl/DXVars.pm [new file with mode: 0644]
perl/Msg.pm [new file with mode: 0644]
perl/client.pl [new file with mode: 0755]
perl/cluster.pl [new file with mode: 0755]
perl/msgdemo.pl [new file with mode: 0644]
perl/persist.c [new file with mode: 0644]
perl/persistent.pl [new file with mode: 0644]

diff --git a/perl/DXConnect.pm b/perl/DXConnect.pm
new file mode 100644 (file)
index 0000000..0d48676
--- /dev/null
@@ -0,0 +1,62 @@
+#
+# module to manage connection lists & data
+#
+
+package DXConnect;
+
+require Exporter;
+@ISA = qw(Exporter);
+
+%connects = undef;
+
+# create a new connection object [$obj = Connect->new($call, $msg_conn_obj, $user_obj)]
+sub new
+{
+  my ($pkg, $call, $conn, $user) = @_;
+  my $self = {};
+  
+  die "trying to create a duplicate Connect for call $call\n" if $connects{$call};
+  $self->{call} = $call;
+  $self->{conn} = $conn;
+  $self->{user} = $user;
+  $self->{t} = time;
+  $self->{state} = 0;
+  bless $self, $pkg; 
+  return $connects{$call} = $self;
+}
+
+# obtain a connection object by callsign [$obj = Connect->get($call)]
+sub get
+{
+  my ($pkg, $call) = @_;
+  return $connect{$call};
+}
+
+# obtain all the connection objects
+sub get_all
+{
+  my ($pkg) = @_;
+  return values(%connects);
+}
+
+# obtain a connection object by searching for its connection reference
+sub get_by_cnum
+{
+  my ($pkg, $conn) = @_;
+  my $self;
+  
+  foreach $self (values(%connects)) {
+    return $self if ($self->{conn} == $conn);
+  }
+  return undef;
+}
+
+# get rid of a connection object [$obj->del()]
+sub del
+{
+  my $self = shift;
+  delete $connects{$self->{call}};
+}
+
+1;
+__END__;
diff --git a/perl/DXUser.pm b/perl/DXUser.pm
new file mode 100644 (file)
index 0000000..6f7756f
--- /dev/null
@@ -0,0 +1,98 @@
+#
+# DX cluster user routines
+#
+
+package DXUser;
+
+require Exporter;
+@ISA = qw(Exporter);
+
+use MLDBM;
+use Fcntl;
+
+%u = undef;
+$dbm = undef;
+$filename = undef;
+
+#
+# initialise the system
+#
+sub init
+{
+  my ($pkg, $fn) = @_;
+  
+  die "need a filename in User\n" if !$fn;
+  $dbm = tie %u, MLDBM, $fn, O_CREAT|O_RDWR, 0666 or die "can't open user file: $fn ($!)\n";
+  $filename = $fn;
+}
+
+#
+# close the system
+#
+
+sub finish
+{
+  $dbm = undef;
+  untie %u;
+}
+
+#
+# new - create a new user
+#
+
+sub new
+{
+  my ($call) = @_;
+  die "can't create existing call $call in User\n!" if $u{$call};
+
+  my $self = {};
+  $self->{call} = $call;
+  bless $self;
+  $u{call} = $self;
+}
+
+#
+# get - get an existing user
+#
+
+sub get
+{
+  my ($call) = @_;
+  return $u{$call};
+}
+
+#
+# put - put a user
+#
+
+sub put
+{
+  my $self = shift;
+  my $call = $self->{call};
+  $u{$call} = $self;
+}
+
+#
+# del - delete a user
+#
+
+sub del
+{
+  my $self = shift;
+  my $call = $self->{call};
+  delete $u{$call};
+}
+
+#
+# close - close down a user
+#
+
+sub close
+{
+  my $self = shift;
+  $self->{lastin} = time;
+  $self->put();
+}
+
+1;
+__END__
diff --git a/perl/DXUtil.pm b/perl/DXUtil.pm
new file mode 100644 (file)
index 0000000..d7ddcf6
--- /dev/null
@@ -0,0 +1,23 @@
+#
+# various utilities which are exported globally
+#
+
+package DXUtil;
+
+require Exporter;
+@ISA = qw(Exporter);
+@EXPORT = qw(atime
+            );
+
+@month = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec);
+
+sub atime
+{
+  my ($sec,$min,$hour,$mday,$mon,$year) = gmtime(time);
+  $year += 1900;
+  my $buf = sprintf "%02d%s%04d\@%02d:%02d:%02d", $mday, $month[$mon], $year, $hour, $min, $sec;
+  return $buf;
+}
+
+
+
diff --git a/perl/DXVars.pm b/perl/DXVars.pm
new file mode 100644 (file)
index 0000000..df78142
--- /dev/null
@@ -0,0 +1,71 @@
+#
+# The system variables - those indicated will need to be changed to suit your
+# circumstances (and callsign)
+#
+
+package main;
+
+require Exporter;
+@ISA = qw(Exporter);
+
+@EXPORT_OK = qw($mycall $myname $mynormalcall $mylatitude $mylongtitude $mylocator
+                $myqth $myemail $myprot 
+                $clusterport $clusteraddr $debugfn 
+                $def_hopcount $root $data $system $cmd
+                               $userfn
+               );
+                          
+                          
+# this really does need to change for your system!!!!                     
+$mycall = "GB7TLH";
+
+# your name
+$myname = "Dirk";
+
+# Your 'normal' callsign 
+$mynormalcall = "G1TLH";
+
+# Your latitude (+)ve = North (-)ve = South in degrees and decimal degrees
+$mylatitude = +52.68584579;
+
+# Your Longtitude (+)ve = East, (-)ve = West in degrees and decimal degrees
+$mylongtitude = +0.94518260;
+
+# Your locator (yes I know I can calculate it - eventually)
+$mylocator = "JO02LQ";
+
+# Your QTH (roughly)
+$myqth = "East Dereham, Norfolk";
+
+# Your e-mail address
+$myemail = "djk@tobit.co.uk";
+
+# the tcp address of the cluster and so does this !!!
+$clusteraddr = "dirk1.tobit.co.uk";
+
+# the port number of the cluster (just leave this, unless it REALLY matters to you)
+$clusterport = 27754;
+
+# cluster debug file
+$debugfn = "/tmp/debug_cluster";
+
+# the version of DX cluster (tm) software I am masquerading as
+$myprot = "5447";
+
+# default hopcount to use - note this will override any incoming hop counts, if they are greater
+$def_hopcount = 7;
+
+# root of directory tree for this system
+$root = "/spider"; 
+
+# data files live in 
+$data = "$root/data";
+
+# system files live in
+$system = "$root/sys";
+
+# command files live in
+$cmd = "$root/cmd";
+
+# where the user data lives
+$userfn = "$data/users";
diff --git a/perl/Msg.pm b/perl/Msg.pm
new file mode 100644 (file)
index 0000000..7114eba
--- /dev/null
@@ -0,0 +1,329 @@
+#
+# This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
+#
+# I am presuming that the code is distributed on the same basis as perl itself.
+#
+# I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
+#
+package Msg;
+
+require Exporter;
+@ISA = qw(Exporter);
+
+use strict;
+use IO::Select;
+use IO::Socket;
+use Carp;
+
+use vars qw (%rd_callbacks %wt_callbacks $rd_handles $wt_handles);
+
+%rd_callbacks = ();
+%wt_callbacks = ();
+$rd_handles   = IO::Select->new();
+$wt_handles   = IO::Select->new();
+my $blocking_supported = 0;
+
+BEGIN {
+    # Checks if blocking is supported
+    eval {
+        require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
+    };
+    $blocking_supported = 1 unless $@;
+}
+
+#-----------------------------------------------------------------
+# Send side routines
+sub connect {
+    my ($pkg, $to_host, $to_port,$rcvd_notification_proc) = @_;
+    
+    # Create a new internet socket
+    
+    my $sock = IO::Socket::INET->new (
+                                      PeerAddr => $to_host,
+                                      PeerPort => $to_port,
+                                      Proto    => 'tcp',
+                                      Reuse    => 1);
+
+    return undef unless $sock;
+
+    # Create a connection end-point object
+    my $conn = {
+        sock                   => $sock,
+        rcvd_notification_proc => $rcvd_notification_proc,
+    };
+    
+    if ($rcvd_notification_proc) {
+        my $callback = sub {_rcv($conn, 0)};
+        set_event_handler ($sock, "read" => $callback);
+    }
+    return bless $conn, $pkg;
+}
+
+sub disconnect {
+    my $conn = shift;
+    my $sock = delete $conn->{sock};
+    return unless defined($sock);
+    set_event_handler ($sock, "read" => undef, "write" => undef);
+    close($sock);
+}
+
+sub send_now {
+    my ($conn, $msg) = @_;
+    _enqueue ($conn, $msg);
+    $conn->_send (1); # 1 ==> flush
+}
+
+sub send_later {
+    my ($conn, $msg) = @_;
+    _enqueue($conn, $msg);
+    my $sock = $conn->{sock};
+    return unless defined($sock);
+    set_event_handler ($sock, "write" => sub {$conn->_send(0)});
+}
+
+sub _enqueue {
+    my ($conn, $msg) = @_;
+    # prepend length (encoded as network long)
+    my $len = length($msg);
+    $msg = pack ('N', $len) . $msg; 
+    push (@{$conn->{queue}}, $msg);
+}
+
+sub _send {
+    my ($conn, $flush) = @_;
+    my $sock = $conn->{sock};
+    return unless defined($sock);
+    my ($rq) = $conn->{queue};
+
+    # If $flush is set, set the socket to blocking, and send all
+    # messages in the queue - return only if there's an error
+    # If $flush is 0 (deferred mode) make the socket non-blocking, and
+    # return to the event loop only after every message, or if it
+    # is likely to block in the middle of a message.
+
+    $flush ? $conn->set_blocking() : $conn->set_non_blocking();
+    my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
+
+    while (@$rq) {
+        my $msg            = $rq->[0];
+        my $bytes_to_write = length($msg) - $offset;
+        my $bytes_written  = 0;
+        while ($bytes_to_write) {
+            $bytes_written = syswrite ($sock, $msg,
+                                       $bytes_to_write, $offset);
+            if (!defined($bytes_written)) {
+                if (_err_will_block($!)) {
+                    # Should happen only in deferred mode. Record how
+                    # much we have already sent.
+                    $conn->{send_offset} = $offset;
+                    # Event handler should already be set, so we will
+                    # be called back eventually, and will resume sending
+                    return 1;
+                } else {    # Uh, oh
+                    $conn->handle_send_err($!);
+                    return 0; # fail. Message remains in queue ..
+                }
+            }
+            $offset         += $bytes_written;
+            $bytes_to_write -= $bytes_written;
+        }
+        delete $conn->{send_offset};
+        $offset = 0;
+        shift @$rq;
+        last unless $flush; # Go back to select and wait
+                            # for it to fire again.
+    }
+    # Call me back if queue has not been drained.
+    if (@$rq) {
+        set_event_handler ($sock, "write" => sub {$conn->_send(0)});
+    } else {
+        set_event_handler ($sock, "write" => undef);
+    }
+    1;  # Success
+}
+
+sub _err_will_block {
+    if ($blocking_supported) {
+        return ($_[0] == EAGAIN());
+    }
+    return 0;
+}
+sub set_non_blocking {                        # $conn->set_blocking
+    if ($blocking_supported) {
+        # preserve other fcntl flags
+        my $flags = fcntl ($_[0], F_GETFL(), 0);
+        fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK());
+    }
+}
+sub set_blocking {
+    if ($blocking_supported) {
+        my $flags = fcntl ($_[0], F_GETFL(), 0);
+        $flags  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
+        fcntl ($_[0], F_SETFL(), $flags);
+    }
+}
+sub handle_send_err {
+   # For more meaningful handling of send errors, subclass Msg and
+   # rebless $conn.  
+   my ($conn, $err_msg) = @_;
+   warn "Error while sending: $err_msg \n";
+   set_event_handler ($conn->{sock}, "write" => undef);
+}
+
+#-----------------------------------------------------------------
+# Receive side routines
+
+my ($g_login_proc,$g_pkg);
+my $main_socket = 0;
+sub new_server {
+    @_ == 4 || die "Msg->new_server (myhost, myport, login_proc)\n";
+    my ($pkg, $my_host, $my_port, $login_proc) = @_;
+    
+    $main_socket = IO::Socket::INET->new (
+                                          LocalAddr => $my_host,
+                                          LocalPort => $my_port,
+                                          Listen    => 5,
+                                          Proto     => 'tcp',
+                                          Reuse     => 1);
+    die "Could not create socket: $! \n" unless $main_socket;
+    set_event_handler ($main_socket, "read" => \&_new_client);
+    $g_login_proc = $login_proc; $g_pkg = $pkg;
+}
+
+sub rcv_now {
+    my ($conn) = @_;
+    my ($msg, $err) = _rcv ($conn, 1); # 1 ==> rcv now
+    return wantarray ? ($msg, $err) : $msg;
+}
+
+sub _rcv {                     # Complement to _send
+    my ($conn, $rcv_now) = @_; # $rcv_now complement of $flush
+    # Find out how much has already been received, if at all
+    my ($msg, $offset, $bytes_to_read, $bytes_read);
+    my $sock = $conn->{sock};
+    return unless defined($sock);
+    if (exists $conn->{msg}) {
+        $msg           = $conn->{msg};
+        $offset        = length($msg) - 1;  # sysread appends to it.
+        $bytes_to_read = $conn->{bytes_to_read};
+        delete $conn->{'msg'};              # have made a copy
+    } else {
+        # The typical case ...
+        $msg           = "";                # Otherwise -w complains 
+        $offset        = 0 ;  
+        $bytes_to_read = 0 ;                # Will get set soon
+    }
+    # We want to read the message length in blocking mode. Quite
+    # unlikely that we'll get blocked too long reading 4 bytes
+    if (!$bytes_to_read)  {                 # Get new length 
+        my $buf;
+        $conn->set_blocking();
+        $bytes_read = sysread($sock, $buf, 4);
+        if ($! || ($bytes_read != 4)) {
+            goto FINISH;
+        }
+        $bytes_to_read = unpack ('N', $buf);
+    }
+    $conn->set_non_blocking() unless $rcv_now;
+    while ($bytes_to_read) {
+        $bytes_read = sysread ($sock, $msg, $bytes_to_read, $offset);
+        if (defined ($bytes_read)) {
+            if ($bytes_read == 0) {
+                last;
+            }
+            $bytes_to_read -= $bytes_read;
+            $offset        += $bytes_read;
+        } else {
+            if (_err_will_block($!)) {
+                # Should come here only in non-blocking mode
+                $conn->{msg}           = $msg;
+                $conn->{bytes_to_read} = $bytes_to_read;
+                return ;   # .. _rcv will be called later
+                           # when socket is readable again
+            } else {
+                last;
+            }
+        }
+    }
+
+  FINISH:
+    if (length($msg) == 0) {
+        $conn->disconnect();
+    }
+    if ($rcv_now) {
+        return ($msg, $!);
+    } else {
+        &{$conn->{rcvd_notification_proc}}($conn, $msg, $!);
+    }
+}
+
+sub _new_client {
+    my $sock = $main_socket->accept();
+    my $conn = bless {
+        'sock' =>  $sock,
+        'state' => 'connected'
+    }, $g_pkg;
+    my $rcvd_notification_proc =
+        &$g_login_proc ($conn, $sock->peerhost(), $sock->peerport());
+    if ($rcvd_notification_proc) {
+        $conn->{rcvd_notification_proc} = $rcvd_notification_proc;
+        my $callback = sub {_rcv($conn,0)};
+        set_event_handler ($sock, "read" => $callback);
+    } else {  # Login failed
+        $conn->disconnect();
+    }
+}
+
+#----------------------------------------------------
+# Event loop routines used by both client and server
+
+sub set_event_handler {
+    shift unless ref($_[0]); # shift if first arg is package name
+    my ($handle, %args) = @_;
+    my $callback;
+    if (exists $args{'write'}) {
+        $callback = $args{'write'};
+        if ($callback) {
+            $wt_callbacks{$handle} = $callback;
+            $wt_handles->add($handle);
+        } else {
+            delete $wt_callbacks{$handle};
+            $wt_handles->remove($handle);
+        }
+    }
+    if (exists $args{'read'}) {
+        $callback = $args{'read'};
+        if ($callback) {
+            $rd_callbacks{$handle} = $callback;
+            $rd_handles->add($handle);
+        } else {
+            delete $rd_callbacks{$handle};
+            $rd_handles->remove($handle);
+       }
+    }
+}
+
+sub event_loop {
+    my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
+    my ($conn, $r, $w, $rset, $wset);
+    while (1) {
+        # Quit the loop if no handles left to process
+        last unless ($rd_handles->count() || $wt_handles->count());
+        ($rset, $wset) =
+            IO::Select->select ($rd_handles, $wt_handles, undef, $timeout);
+        foreach $r (@$rset) {
+            &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r};
+        }
+        foreach $w (@$wset) {
+            &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
+        }
+        if (defined($loop_count)) {
+            last unless --$loop_count;
+        }
+    }
+}
+
+1;
+
+__END__
+
diff --git a/perl/client.pl b/perl/client.pl
new file mode 100755 (executable)
index 0000000..c508d94
--- /dev/null
@@ -0,0 +1,111 @@
+#!/usr/bin/perl
+#
+# A thing that implements dxcluster 'protocol'
+#
+# This is a perl module/program that sits on the end of a dxcluster
+# 'protocol' connection and deals with anything that might come along.
+#
+# this program is called by ax25d and gets raw ax25 text on its input
+#
+# Copyright (c) 1998 Dirk Koopman G1TLH
+#
+# 
+
+use Msg;
+use DXVars;
+
+$mode = 1;                      # 1 - \n = \r as EOL, 2 - \n = \n, 0 - transparent
+$call = "";                     # the callsign being used
+@stdoutq = ();                  # the queue of stuff to send out to the user
+$conn = 0;                      # the connection object for the cluster
+$lastbit = "";                  # the last bit of an incomplete input line
+
+# cease communications
+sub cease
+{
+  my $sendz = shift;
+  if (defined $conn && $sendz) {
+    $conn->send_now("Z$call|bye...\n");
+  }
+  exit(0);     
+}
+
+# terminate program from signal
+sub sig_term
+{
+  cease(1);
+}
+
+# handle incoming messages
+sub rec_socket
+{
+  my ($con, $msg, $err) = @_;
+  if (defined $err && $err) {
+    cease(1);
+  }
+  if (defined $msg) {
+    my ($sort, $call, $line) = $msg =~ /^(\w)(\S+)|(.*)$/;
+       
+       if ($sort eq 'D') {
+          my $nl = ($mode == 1) ? "\r" : "\n";
+          $nl = "" if $mode == 0;
+          $line =~ s/\n/\r/o if $mode == 1;
+          print $line, $nl;
+       } elsif ($sort eq 'M') {
+         $mode = $line;               # set new mode from cluster
+       } elsif ($sort eq 'Z') {       # end, disconnect, go, away .....
+         cease(0);
+    }    
+  } 
+}
+
+sub rec_stdin
+{
+  my ($fh) = @_;
+  my $buf;
+  my @lines;
+  my $r;
+  my $first;
+  my $dangle = 0;
+  
+  $r = sysread($fh, $buf, 1024);
+#  print "sys: $r $buf";
+  if ($r > 0) {
+    if ($mode) {
+         $buf =~ s/\r/\n/o if $mode == 1;
+         $dangle = !($buf =~ /\n$/);
+         @lines = split /\n/, $buf;
+         if ($dangle) {                # pull off any dangly bits
+           $buf = pop @lines;
+         } else {
+           $buf = "";
+         }
+         $first = shift @lines;
+         unshift @lines, ($lastbit . $first) if ($first);
+         foreach $first (@lines) {
+           $conn->send_now("D$call|$first");
+         }
+         $lastbit = $buf;  
+       } else {
+         $conn->send_now("D$call|$buf");
+       }
+  } elsif ($r == 0) {
+    cease(1);
+  }
+}
+
+$call = uc $ARGV[0];
+die "client.pl <call> [<mode>]\r\n" if (!$call);
+$mode = $ARGV[1] if (@ARGV > 1);
+
+select STDOUT; $| = 1;
+
+$SIG{'INT'} = \&sig_term;
+$SIG{'TERM'} = \&sig_term;
+$SIG{'HUP'} = \&sig_term;
+
+$conn = Msg->connect("$clusteraddr", $clusterport, \&rec_socket);
+$conn->send_now("A$call|start");
+Msg->set_event_handler(\*STDIN, "read" => \&rec_stdin);
+Msg->event_loop();
+
diff --git a/perl/cluster.pl b/perl/cluster.pl
new file mode 100755 (executable)
index 0000000..c324930
--- /dev/null
@@ -0,0 +1,138 @@
+#!/usr/bin/perl
+#
+# A thing that implements dxcluster 'protocol'
+#
+# This is a perl module/program that sits on the end of a dxcluster
+# 'protocol' connection and deals with anything that might come along.
+#
+# this program is called by ax25d and gets raw ax25 text on its input
+#
+# Copyright (c) 1998 Dirk Koopman G1TLH
+#
+# 
+
+use Msg;
+use DXVars;
+use DXUtil;
+use DXConnect;
+use DXUser;
+
+package main;
+
+@inqueue = undef;                # the main input queue, an array of hashes 
+
+# handle out going messages
+sub send_now
+{
+  my ($conn, $sort, $call, $line) = @_;
+
+  print DEBUG "$t > $sort $call $line\n" if defined DEBUG;
+  print "> $sort $call $line\n";
+  $conn->send_now("$sort$call|$line");
+}
+
+sub send_later
+{
+  my ($conn, $sort, $call, $line) = @_;
+
+  print DEBUG "$t > $sort $call $line\n" if defined DEBUG;
+  print "> $sort $call $line\n";
+  $conn->send_later("$sort$call|$line");
+}
+
+# handle disconnections
+sub disconnect
+{
+  my $dxconn = shift;
+  my ($user) = $dxconn->{user};
+  my ($conn) = $dxconn->{conn};
+  $user->close() if defined $user;
+  $conn->disconnect();
+  $dxconn->del();
+}
+
+# handle incoming messages
+sub rec
+{
+  my ($conn, $msg, $err) = @_;
+  my $dxconn = DXConnect->get_by_cnum($conn);      # get the dxconnnect object for this message
+  
+  if (defined $err && $err) {
+    disconnect($dxconn);
+       return;
+  } 
+  if (defined $msg) {
+    my $self = bless {}, "inqueue";
+    $self->{dxconn} = $dxconn;
+    $self->{data} = $msg;
+       push @inqueue, $self;
+  }
+}
+
+sub login
+{
+  return \&rec;
+}
+
+# cease running this program, close down all the connections nicely
+sub cease
+{
+  my $dxconn;
+  foreach $dxconn (DXConnect->get_all()) {
+    disconnect($dxconn);
+  }
+}
+
+# this is where the input queue is dealt with and things are dispatched off to other parts of
+# the cluster
+sub process_inqueue
+{
+  my $self = shift @inqueue;
+  return if !$self;
+  
+  my $data = $self->{data};
+  my $dxconn = $self->{dxconn};
+  my ($sort, $call, $line) = $data =~ /^(\w)(\S+)|(.*)$/;
+  
+  # do the really sexy console interface bit! (Who is going to do the TK interface then?)
+  print DEBUG atime, " < $sort $call $line\n" if defined DEBUG;
+  print "< $sort $call $line\n";
+  
+  # handle A records
+  if ($sort eq 'A') {
+    if ($dxconn) {                         # there should not be one of these, disconnect
+
+       }
+    my $user = DXUser->get($call);         # see if we have one of these
+  }
+  
+}
+
+#############################################################
+#
+# The start of the main line of code 
+#
+#############################################################
+
+# open the debug file, set various FHs to be unbuffered
+open(DEBUG, ">>$debugfn") or die "can't open $debugfn($!)\n";
+select DEBUG; $| = 1;
+select STDOUT; $| = 1;
+
+# initialise User file system
+DXUser->init($userfn);
+
+# start listening for incoming messages/connects
+Msg->new_server("$clusteraddr", $clusterport, \&login);
+
+# prime some signals
+$SIG{'INT'} = \&cease;
+$SIG{'TERM'} = \&cease;
+$SIG{'HUP'} = 'IGNORE';
+
+# this, such as it is, is the main loop!
+for (;;) {
+  Msg->event_loop(1, 0.001);
+  process_inqueue();
+}
+
diff --git a/perl/msgdemo.pl b/perl/msgdemo.pl
new file mode 100644 (file)
index 0000000..9ea4056
--- /dev/null
@@ -0,0 +1,57 @@
+
+# 
+# testmsg.pl - Used for testing the Msg.pm module
+#    Invoke as testmsg.pl {-client|-server} 
+#
+use Msg;
+use strict;
+
+my $i = 0;
+sub rcvd_msg_from_server {
+    my ($conn, $msg, $err) = @_;
+    if (defined $msg) {
+        die "Strange... shouldn't really be coming here\n";
+    }
+}
+
+my $incoming_msg_count=0;
+
+sub rcvd_msg_from_client {
+    my ($conn, $msg, $err) = @_;
+    if (defined $msg) {
+        ++$i;
+        my $len = length ($msg);
+        print "$i ($len)\n";
+    }
+}
+
+sub login_proc {
+    # Unconditionally accept
+    \&rcvd_msg_from_client;
+}
+
+my $host = 'localhost';
+my $port = 8080;
+my $prog;
+foreach $prog (@ARGV) {
+   if ($prog eq '-server') {
+       Msg->new_server($host, $port, \&login_proc);
+       print "Server created. Waiting for events";
+       Msg->event_loop();
+   } elsif ($prog eq '-client') {
+       my $conn = Msg->connect($host, $port,
+                               \&rcvd_msg_from_server);
+                               
+       die "Client could not connect to $host:$port\n" unless $conn;
+       print "Connection successful.\n";
+       my $i;
+       my $msg = " " x 10000;
+       for ($i = 0; $i < 100; $i++) {
+           print "Sending msg $i\n";
+           $conn->send_now($msg);
+       }
+       $conn->disconnect();
+       Msg->event_loop();
+   }
+}
+
diff --git a/perl/persist.c b/perl/persist.c
new file mode 100644 (file)
index 0000000..d0839e0
--- /dev/null
@@ -0,0 +1,48 @@
+       
+       /* persistent.c */
+#include <EXTERN.h>
+#include <perl.h>
+       
+       /* 1 = clean out filename's symbol table after each request, 0 = don't */
+#ifndef DO_CLEAN
+# define DO_CLEAN 0
+#endif
+       
+static PerlInterpreter *perl = NULL;
+
+int    main(int argc, char **argv, char **env)
+{
+       char *embedding[] = { "", "persistent.pl"};
+       char *args[] = { "", DO_CLEAN, NULL     };
+       char filename [1024];
+       int exitstatus = 0;
+       
+       if ((perl = perl_alloc()) == NULL) {
+               fprintf(stderr, "no memory!");
+               exit(1);
+       }
+       perl_construct(perl);
+       
+       exitstatus = perl_parse(perl, NULL, 2, embedding, NULL);
+       
+       if(!exitstatus) {
+               exitstatus = perl_run(perl);
+               
+               while(printf("Enter file name: ") && gets(filename)) {
+                       
+                       /* call the subroutine, passing it the filename as an argument */
+                       args[0] = filename;
+                       perl_call_argv("Embed::Persistent::eval_file",
+                                                  G_DISCARD | G_EVAL, args);
+                       
+                       /* check $@ */
+                       if(SvTRUE(GvSV(errgv)))
+                               fprintf(stderr, "eval error: %s\n", SvPV(GvSV(errgv),na));
+               }
+       }
+       
+       perl_destruct_level = 0;
+       perl_destruct(perl);
+       perl_free(perl);
+       exit(exitstatus);
+}
diff --git a/perl/persistent.pl b/perl/persistent.pl
new file mode 100644 (file)
index 0000000..39e993b
--- /dev/null
@@ -0,0 +1,72 @@
+package Embed::Persistent;
+#persistent.pl
+
+#require Devel::Symdump;  
+use strict;
+use vars '%Cache';
+
+sub valid_package_name {
+       my($string) = @_;
+       $string =~ s/([^A-Za-z0-9\/])/sprintf("_%2x",unpack("C",$1))/eg;
+#second pass only for words starting with a digit
+       $string =~ s|/(\d)|sprintf("/_%2x",unpack("C",$1))|eg;
+       
+#Dress it up as a real package name
+       $string =~ s|/|::|g;
+       return "Embed" . $string;
+}
+
+#borrowed from Safe.pm
+sub delete_package {
+       my $pkg = shift;
+       my ($stem, $leaf);
+       
+       no strict 'refs';
+       $pkg = "main::$pkg\::";    # expand to full symbol table name
+               ($stem, $leaf) = $pkg =~ m/(.*::)(\w+::)$/;
+       
+       my $stem_symtab = *{$stem}{HASH };
+       
+       delete $stem_symtab->{$leaf     };
+ }
+
+sub eval_file {
+       my($filename, $delete) = @_;
+       my $package = valid_package_name($filename);
+       my $mtime = -M $filename;
+       if(defined $Cache{$package}{mtime} && $Cache{$package}{mtime } <= $mtime) {
+#we have compiled this subroutine already,
+#it has not been updated on disk, nothing left to do
+               print STDERR "already compiled $package->handler\n";
+       } else {
+               local *FH;
+               open FH, $filename or die "open '$filename' $!";
+               local($/) = undef;
+               my $sub = <FH>;
+               close FH;
+               
+#wrap the code into a subroutine inside our unique package
+               my $eval = qq{package $package; sub handler { $sub; }};
+               {
+#hide our variables within this block
+                       my($filename,$mtime,$package,$sub);
+                       eval $eval;
+               }
+               die $@ if $@;
+               
+#cache it unless we're cleaning out each time
+               $Cache{$package}{mtime} = $mtime unless $delete;
+}
+
+eval {$package->handler;};
+die $@ if $@;
+
+delete_package($package) if $delete;
+
+#take a look if you want
+#print Devel::Symdump->rnew($package)->as_string, $/;
+}
+
+1;
+
+__END__