Section 7.1. Programming in an Event-Driven Environment


7.1. Programming in an Event-Driven Environment

If you've ever programmed a graphical application using something like Tk or Gtk, you'll know that it's a little different than ordinary procedural programming. In normal programming, you write a sequence of things you'd like the program to do, and it does them. However, GUIs don't work like thatinstead, you set up an environment (a window, for instance) that responds to certain events (clicking a button or selecting a menu item). This is called the event-driven paradigm.

It's not just GUIs that use this paradigm. For example, a network server does not do a sequence of events, but it sits waiting for a connection (an event), and then services the connection depending on the input from the client. When the client is done and disconnects, it goes back to waiting for the next event.

Similarly, you could write something that watches over a directory; it sits around watching, periodically looking at the files in the directory, and as it detects changes made to the files, it fires off certain responses.

The core of the event-driven paradigm is the event loop, sometimes called the main loop. Tk has one, the Event module has one, and POE, an event-driven environment, has one. POE's event loop is handled by the POE kernel.

As we've said, POE can be thought of as a minute operating system, and so the name kernel is no coincidence. When an ordinary operating system's kernel has finished setting up the working environment, it too sits back and waits for events. These can be system calls from user space, or they can be hardware interrupts. As well as servicing events, it takes care of passing messages between different componentstypically communication between processes (IPC).

POE's kernel also services events and handles communications between different parts of the POE world, although the equivalent of processes are called sessions.

7.1.1. Hello, POE

There's been a lot of talk so far and very little code, so let's rectify this with a brief example.

     use POE;     POE::Session->create(         inline_states => {         _start  => \&start,         hello   => \&hello,         },     );     print "Running Kernel\n";     $poe_kernel->run(  );     print "Exiting\n";     exit(0);     sub start {         my ($kernel) = $_[KERNEL];         print "Setting up a session\n";         $kernel->yield("hello");     }     sub hello { print "Hello, world\n"; }

This is the POE equivalent of the famous Hello World program. If we're going to continue to think in operating system terms (which will shortly become unhelpful, but will do for now) then we're starting up a machine's kernel and creating a single process that prints out Hello World and then exits. Let's look at the different pieces of this in turn.

     use POE;     print "Running Kernel\n";     $poe_kernel->run(  );     print "Exiting\n";     exit(0);

This is the core of any POE program; the variable $poe_kernel is provided by the POE module and represents the POE kernel itself. In many cases the call to run will never return; for instance, a network server should sit in a loop accepting new connections until something awful happens. In our case, however, we're only setting up one brief session that soon terminates. Newer code may prefer to say POE::Kernel->run, which is pretty much the same.

     POE::Session->create(         inline_states => {         _start  => \&start,         hello   => \&hello,         },     );

This creates a session. A session can be thought of as a state machine with multiple states, or as a handler for multiple eventsthe two representations are equivalent. In state-speak, the preceding example defines two states in the inline_states parameter passed to the constructor. States whose names begin with an underscore are predefined by POE, whereas all other states are user-defined. The session automatically enters the _start state after it has been successfully constructed.

If you prefer an event-driven explanation, then we say that our session responds to the _start event and the hello event, and POE posts a _start event to the session as soon as it has been created.

There are other predefined events, most of which are to do with parent/child relationships and signals; there's the _stop event, which is posted when a session is due to finish. Let's now see how we handle the events that we've defined:

     sub start {         my ($kernel) = $_[KERNEL];         print "Setting up a session\n";         $kernel->yield("hello");     }     sub hello { print "Hello, world\n"; }

We pass our start handler a number of parameters, one of which is a handle on the POE kernel. We extract this from the parameter list using the KERNEL constant. For the sake of efficiency, POE uses constants like this for indexes into @_, rather than a parameter hash. You'll often see POE state handlers that start something like this:

     my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];

This is just an ordinary array slice with constant indexes, returning the POE kernel, the heap, and the current session object. The heap is a place where a session can store its private, per-session stuff. We'll come back to what sort of stuff is good to store in a heap later.

Now that we have the kernel, what do we do with it? Well, we tell it that we want to be in another state, the hello state:

     $kernel->yield("hello");

We're yielding because we're posting an event to the current session; if we had stored a handle to another session, we could communicate with it by posting an event to it using the post method. We'll see examples of this later on.

So our start-up state has told the POE kernel that soon we want to move to the hello state. This will not happen, however, until the next time POE runs over its event loop. Once we run the loop with $poe_kernel->run, the kernel looks at its list of pending tasks, finds that the first thing it needs to do is move our session into the hello state, and fires off the appropriate handler. Then it prints our Hello, world! message.

7.1.2. Hello, Again, POE!

Suppose we now want the message to repeat every five seconds. We could, of course do this:

     sub hello {         my ($kernel) = $_[KERNEL];         print "Hello, world!\n";         sleep 5;         $kernel->yield("hello");     }

However, this is no way to behave in a cooperative multitasking environment. We can't simply hog the whole kernel for five seconds, because other sessions may have things to do: there might be things coming in from the network that need immediate servicing, and so in. Instead, we need to allow the kernel to schedule the hello state for five seconds in the future. We do this with the delay_set method to the kernel:

     sub hello {         my ($kernel) = $_[KERNEL];         print "Hello, world!\n";         $kernel->delay_set("hello", 5);     }

Now we're a little more polite. Let's now see what we can do with two different sessions running. Here's some code lightly modified from Matt Sergeant's wonderful POE tutorial (http://www.axkit.org/docs/presentations/tpc2002/poe/):

     use POE;     for my $session_no (1..2) {       POE::Session->create(         inline_states => {           hello => \&hello,           _start => sub { $_[KERNEL]->alias_set("session_" . $session_no) },       });     }     $poe_kernel->post("session_1", "hello", "session_2");     $poe_kernel->run(  );     exit(0);     sub hello {       my ($kernel, $session, $next) = @_[KERNEL, SESSION, ARG0];       print "Event in ", $kernel->alias_list($session), "\n";       $kernel->post($next, "hello", $session->ID);     }

We've seen much of this before; we create a session (this time, we create two of them) that has a start handler and a handler for the hello event. Notice that both sessions are sharing the code for the two handlers (_start and hello), although the data passed to the code will be quite different in each case.

This time, the start handler does something a little different from the previous program. It tells the kernel to register an alias for this session. Each session has an internal ID (which we also use later in the program) but that's really known only to POE when the sessions are created. By registering a programmer-friendly alias, we get a handle by which we can refer to the session later in the program.

Again in order to be programmer-friendly, we can ask the kernel for a session's alias in order to output our messages in an understandable manner:

     print "Event in ", $kernel->alias_list($session), "\n";

Now that we have two sessions going on, we need to tell the kernel which of them is going to start the action, and we do this by posting a hello event to session 1, referred to by its alias:

     $poe_kernel->post("session_1", "hello", "session_2");

When we're posting or yielding events, we can pass additional parameters with the event, which get passed in to the event's handler. These arguments arrive in @_ starting at position ARG0. If we had many arguments, we could say something like this to collect them all up:

       my ($kernel, $session, @args) = @_[KERNEL, SESSION, ARG0..$#_];

But here we are only interested in the first argument, which is the name of the next session to call. Session 1 passes control to session 2, and vice versa. Now that we're up and running, we don't need to be programmer-friendly any more, so we can identify the next session to run by its internal session ID:

     $kernel->post($next, "hello", $session->ID);

This says "I'm calling you now, and next time around you call me (by ID)."

With these two sessions running, we now have a cooperative multitasking environment:

     Event in session_1     Event in session_2     Event in session_1     Event in session_2     ...

However, if we're going to do anything interesting with our newfound environment, we have to start looking at POE's provisions for more complex I/O.

7.1.3. Wheels

Wheels are the driving force (hah, hah) of POE's I/O system. A wheel is a connection to the outside world that generates events. You can think of wheels as POE's equivalent to filehandles, but there's more to them than that.

The simplest wheel to understand is POE::Wheel::FollowTail, which follows an ever-growing file. You give a filename to the wheel, and it generates events when that file has more data in it. Here's a nice compact example:

     use POE qw(Wheel::FollowTail);     POE::Session->create(       inline_states => {          _start => sub {             my ($heap) = $_[HEAP];             my $log_watcher = POE::Wheel::FollowTail->new(                 Filename => "my_log_file.txt",                 InputEvent => "got_record",             );             $heap->{watcher} = $log_watcher;          },          got_record => sub { my $record = $_[ARG0]; print $record,"\n"; }       }     );     $poe_kernel->run(  );

First, notice the compact way of loading up multiple POE modules; any parameters to use POE will be interpreted as module names under POE:: and use'd in turn.

As before, we have two states. The got_record state is nice and easy to understand: it prints its argument. Let's have a look at the _start state in a little more detail, though:

             my $log_watcher = POE::Wheel::FollowTail->new(                 Filename => "my_log_file.txt",                 InputEvent => "got_record",             );

The job of our start state is to set up our wheel. We tell the wheel to watch the file my_log_file.txt and post a got_record event every time it sees a new line.

What do we do with our wheel? We'd like it to persist for the duration of the sessionelse it's pretty uselessbut as it's just an ordinary Perl object, it'll be destroyed at the end of the current block if we don't store it somewhere. Now we see the immediate value of having a per-session storage area, the heap:

             my ($heap) = $_[HEAP];             . . .             $heap->{watcher} = $log_watcher;

And this is all we need; the wheel happily sits there watching the file and generating events, and our event handler prints out the line that was seen. Now let's add another wheel into the equation.

Let's suppose, for some reason, our log file is actually binary data and we want to print out new lines in hexadecimal using the hexdump command.[*]

[*] If you don't have a hexdump command in your operating system, demand one! You can also mock one with Perl, of course. Something like this ought to do the trick:

 my $i = -16; binmode(STDIN); my $data; $|++; printf "%07x ". ("%02x%02x "x8)."\n", $i+=16, map ord, split//,$data     while read STDIN, $data, 16;

The POE::Wheel::Run wheel handles I/O with regard to external programs. We can simply create a wheel that calls hexdump, and feed it the data we get:

     use POE qw(Wheel::FollowTail Wheel::Run);     POE::Session->create(       inline_states => {          _start => sub {             my ($heap) = $_[HEAP];             my $log_watcher  = POE::Wheel::FollowTail->new(                 Filename     => "my_log_file.txt",                 InputEvent   => "redirect",             );             my $dumper = POE::Wheel::Run->new(                 Program      => "/usr/bin/hexdump",                 StdoutEvent  => "print"             );             $heap->{watcher} = $log_watcher;             $heap->{dumper}  = $dumper;          },          redirect => sub {             my ($heap, $data) = @_[HEAP, ARG0];             $heap->{dumper}->put($data);          },          print => sub { my $record = $_[ARG0]; print $record, "\n"; }       }     );

Let's look at a diagram of what's going on in Figure 7-1.

Figure 7-1. Filtered log tailing


The FollowTail wheel feeds data into the program and sends it to the session, which sends it straight back out to the Run wheel, which in turn generates print events and prints the data. Wonderful.

Except it doesn't work. If we try and run this with an ordinary Unix hexdump, all our data disappears into the ether and is never seen again. But here's an interesting thing: if we use our makeshift Perl hexdump, it works just fine. Can you guess why this is?

The key is in the magic $|++ in our version. The system's hexdump buffers its output completely if it senses that it's connected to a pipe. Since our program isn't supposed to terminate, hexdump just sits there buffering data until we break, at which point everything is lost. We need to trick hexdump into thinking that it's connected to a real terminal. Unsurprisingly, POE provides a way to do this:

             my $dumper = POE::Wheel::Run->new(                 Program      => "/usr/bin/hexdump",                 Conduit      => "pty",                 StdoutEvent  => "print"             );

There are various other wheels you can fit together like this:POE::Wheel::Curses reads data using the non-blocking Curses interface library, whereas POE::Wheel::ReadLine uses Term::ReadKey to implement a line-based editable console input interface. POE::Wheel::ListenAccept is a low-level socket-based listener. We'll look at two of the more important wheels in our next example, POE::Wheel::ReadWrite and POE::Wheel::SocketFactory.

7.1.4. A Port Forwarder

You know the story. You're at work. You're behind an aggressive firewall that won't let you IRC. You simply can't work without IRC, so you perform some nasty shenanigans. You're going to set up some forwarders so that when you connect to port 6667 on your local machine, it heads off to port 80 (which is allowed through the firewall) on your hosted box out in the real world. Then another forwarder will listen on port 80 of that machine and forward connections through to port 6667 on the IRC server. You set your IRC client to connect to localhost, and, boom, you're connected right through. Let's see how POE can help you lose your job.

This example was inspired by the wonderful POE Cookbook (http://poe.perl.org/?POE_Cookbook) and a certain large accounting company's overly restrictive firewall.


Let's start by setting up the server that listens for connections:

         my $office = shift;         my ($local_address, $local_port, $remote_address, $remote_port);         ($office ? $remote_address : $local_address) = "mybox.real-world.int";         ($office ? $local_port     : $remote_port)   = 6667;         ($office ? $remote_port    : $local_port)    = 80;         if ($office) {            $local_address = "127.0.0.1";         } else {            $remote_address = "irc.perl.org";         }         POE::Session->new           ( _start => \&server_start,             client_connected => \&client_connected,             [ $local_address, $local_port, $remote_address, $remote_port ]           );        $poe_kernel->run;

Once we've worked out whether we're the forwarder from the office to the hosted machine or from the hosted machine to the eventual server, we set up the various addresses and ports, and create a new session with the appropriate parameters. This one session starts up all the other sessions we need. As we're dealing with three parties in this forwarding exchangethe socket we bind to, the client that connects to us, and the server that we tunnel towe need three sessions and three wheels.

We've omitted a lot of error handling in this and later sessions, partly for clarity of the explanation, and partly because if an error does happen while, say, accepting a connection, there's very little you can do about it other than ignore it and wait for the next successful connection.

But you shouldn't do that, of course. Even just logging an error and then doing nothing about it shows you've thought it through a little.


The first wheel comes in the server's start state; this has to set up a listener on the appropriate address and port, which we'll do with the SocketFactory wheel:

     sub server_start {         my ( $heap, $local_addr, $local_port, $remote_addr, $remote_port )         = @_[ HEAP, ARG0,        ARG1,        ARG2,         ARG3 ];         # Store our parameters         $heap->{local_addr}  = $local_addr;         $heap->{local_port}  = $local_port;         $heap->{remote_addr} = $remote_addr;         $heap->{remote_port} = $remote_port;         # Create and store a wheel         $heap->{server_wheel} = POE::Wheel::SocketFactory->new           ( BindAddress  => $local_addr,             BindPort     => $local_port,             Reuse        => 'yes',             SuccessEvent => 'client_connected'           );     }

When the SocketFactory wheel accepts a connection and posts a client_connected event, it passes the socket and the peer address and port like so:

     sub client_connected {         my ( $heap, $socket, $peer_addr, $peer_port ) =           @_[ HEAP,  ARG0,    ARG1,       ARG2];     }

Now we have a server that listens for and accepts connections, but what do we do once we've accepted one? In an ordinary, non-POE application, we'd probably fork here or create a new thread to service the request so we can immediately get back to listening for new connections. In POE terms, we create a new session to handle the client. Remembering that we've stored our connection parameters in the first session's heap, we can pass these on to the new session.

     sub accept {         my ( $heap, $socket, $peer_addr, $peer_port ) =           @_[ HEAP,  ARG0,    ARG1,       ARG2];         POE::Session->new           ( _start => \&forwarder_start,             server_connect => \&connected_to_other_side,             client_input   => \&forward_outbound,             server_input   => \&forward_inbound,             [ $socket, $peer_addr, $peer_port,               $heap->{remote_addr}, $heap->{remote_port} ]           );     }

When this session starts up, it needs to set up the connection to the final destination and get ready to read and write data from the client. We do this by passing the client $socket we received to our second wheel, POE::Wheel::ReadWrite, POE's generic I/O wheel. Just like in a non-POE environment, we reuse the socket that we've been using to handle the connection as a filehandle to read from and write to.

Let's stop for a moment and look at a diagram of what we've got so far in Figure 7-2.

Figure 7-2. POE forwarder: step 1


So far we've taken care of the client that has connected to us; we also want another wheel to connect us to the server at the other end of the forwarding tunnel.

     sub forwarder_start {         my ( $heap, $session,             $socket, $peer_host, $peer_port, $remote_addr, $remote_port           ) =           @_[ HEAP, SESSION, ARG0, ARG1, ARG2, ARG3, ARG4 ];         $heap->{qw(peer_host    peer_port   remote_addr   remote_port)} =                   ($peer_host, $peer_port, $remote_addr, $remote_port);         $heap->{wheel_client} = POE::Wheel::ReadWrite->new           ( Handle => $socket,             Filter     => POE::Filter::Stream->new,             InputEvent => 'client_input',           );         $heap->{wheel_server} = POE::Wheel::SocketFactory->new           ( RemoteAddress => $remote_addr,             RemotePort   => $remote_port,             SuccessEvent => 'server_connect',           );     }

We'll add one slight detail to that; since we're trying to do everything as asynchronously as possible, we have to look out for the case where we're still establishing a connection to the server, but we've received some data from the client. We add a queue to store any data we get before the connection is set up:

         $heap->{state} = 'connecting';         $heap->{queue} = [  ];

Now let's see what happens when data comes in from the client. If we're still awaiting the connection, it gets put in the queue. Otherwise, we send it out through the other wheel to the server:

     sub forward_outbound {         my ( $heap, $input ) = @_[ HEAP, ARG0 ];         if ( $heap->{state} eq 'connecting' ) {             push @{ $heap->{queue} }, $input;         }         else {             $heap->{wheel_server}->put($input);         }     }

Once we have set up the connection with the other side, we need to do the same sort of thing again and turn the socket into our third wheel, another ReadWrite wheel.

     sub connected_to_other_side {         my ( $kernel, $session, $heap, $socket ) = @_[ KERNEL, SESSION,     HEAP, ARG0     ];         $heap->{wheel_server} = POE::Wheel::ReadWrite->new           ( Handle => $socket,             Driver     => POE::Driver::SysRW->new,             Filter     => POE::Filter::Stream->new,             InputEvent => 'server_input',           );     }

We can now run the queue in case anything has built up while we were connecting:

         $heap->{state} = 'connected';         foreach my $pending ( @{ $heap->{queue} } ) {             $kernel->call( $session, 'client_input', $pending );         }         $heap->{queue} = [  ];

For each bit of data we receive, we post the data back to the client_input event; however, this time we are no longer connecting, and the event will pass the data onto the server.

Finally, we need to move data received from the server back down the tunnel to the client, by filling in the forward_inbound subroutine:

     my ( $heap, $input ) = @_[ HEAP, ARG0 ];     $heap->{wheel_client}->put($input);

Let's take a look at a final diagram of the whole forwarder, in Figure 7-3, before we start to look at how to make this even simpler.

Figure 7-3. The completed forwarder




Advanced Perl Programming
Advanced Perl Programming
ISBN: 0596004567
EAN: 2147483647
Year: 2004
Pages: 107
Authors: Simon Cozens

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net