Copyright Notice
This text is copyright by InfoStrada Communications, Inc., and is used with their permission. Further distribution or use is not permitted.This text has appeared in an edited form in Linux Magazine magazine. However, the version you are reading here is as the author originally submitted the article for publication, not after their editors applied their creativity.
Please read all the information in the table of contents before using this article.
Download this listing! | ||
Linux Magazine Column 41 (Oct 2002)
[suggested title: Doing many things at once]
Since the first version of Unix back some three decades ago, the fork system call has been the normal way to get many things to happen at once. Forking is a very nice (some say ``elegant'') model of concurrent execution. Individual processes have entirely separate address spaces, with little chance of interference from other tasks, at the cost of a lot of overhead for interprocess communication.
Starting some dozen years ago or so, user-space ``thread'' libraries have popped up, with various degrees of success. The motivation was to make it simpler for inter-task communication, at the expense that it was then easier to stomp on each other's data and control flow. The Perl developers have taken different stabs at incorporating interfaces to those thread libraries, with some relative success.
But even while that was going on, there were also independent efforts by the Perl user community to try to wrestle with concurrent tasks.
The earliest of those that I recall was simply using the select
function to watch for the ``first ready'' I/O (or a timeout)
simultaneously, and then dispatching to the proper handling routine.
This was later wrapped into IO::Select
.
And in parallel with that, the Event
module was created, using an
XS-level interface to handle not only filehandle readiness, but also
signals, in an orderly way.
Somewhere in here, a group of developers loosely coordinated by Rocco
Caputo started developing and expanding the POE environment, which has
become a powerful means to manage concurrent tasks within a process,
or even multiple processes. Rather than repeat all the introductory
stuff, I'll refer you to http://poe.perl.org
for details.
The basic concept with POE at the lowest level is the idea of a kernel that dispatches events to a session's handler. These events are nearly always generated as a result of other handlers having been run, although there are also ``system events'' and ``signal events'' that happen as well. So, you define a session (similar to a thread in threading or a process in Unix), set up its handlers mapped to events, and then let the kernel sort out the events.
Starting from this lowest level, many ``components'' and ``wheels'' have been written that are encapsulations of common patterns, such as watching a file growing or a directory with incoming items, or managing connections to a TCP port. And on top of this you'll also find ready-to-use-code for HTTP servers, IRC connections, and HTTP web page fetches.
POE definitely has some interesting uses, especially for networking
code, and fits well in the ``Perl as glue'' model. POE also plays
nicely with existing Event
loops, as well as Tk
, Gtk
, and
Curses
programs.
In looking at POE, I wanted to flex my programming muscles on a typical task to get my head around POE's ``event programming'' model. I decided to tackle a familar task: checking web links. But why is this a good candidate for a concurrent set of operations?
Well, a naive web link checker will fetch a page, wait for it to return, then note the success, and perhaps look at the page for further links to follow, then move on to the next page until it's done. But that means the program is spending most of its time waiting for I/O (or a DNS lookup). So while it's doing that, it could instead go fetch a different page, but that's where it gets messy.
In the past, I've LWP::Parallel
and LWP::UA
and forking and
pre-forking to tackle this problem, each with their own advantages and
disadvantages. And so I decided to try this with POE as well.
The design is simple. Starting at a top-level URL, fetch that page. If the page is available, and has HTML links, recursively scan those links, until there's no more to do. To keep it fast, concurrently scan as many as 10 links at at time, so that the I/O wait or DNS wait time of one fetch overlaps the CPU hard stuff of another. And after a couple of hours of reading the docs (and bugging Rocco online), I have [listing one, presented below].
Lines 1 through 3 start nearly every program I write, turning on
warnings, compiler restictions, and disabling the normal
block-oriented buffering of STDOUT
.
Line 5 defines a constant for the name of the POE session representing the user agent. We need this as a constant to enable the individual checker sessions to post the page fetch request.
Line 6 pulls in POE, and also brings in
POE::Component::Client::HTTP
as well. The POE
module has a
non-standard import
routine, so the names are not symbols to be
imported, as in most other modules.
Line 8 defines the constant for the top-level URL to be checked. This
is a very simple checker that looks at the URL given in $TOP
,
descends through all HTML pages that are below this page, and verifies
(without examining the contents) any URLs that are outside this tree.
The URL given here is the Google view of the DMOZ entry for Perl,
altered ever-so-slightly so that people won't run this program without
first reading this description. I think you'll note that the change
is obvious.
Lines 10 and 11 set up the two top-level sessions. First, the user
agent session is created, given the session name in POCO_HTTP
, and
a timeout of 30 seconds. Then, the master session is spawned, placing
two values into the session's ``heap''. The role of the master session
is to manage the task queue and the number of child checker sessions.
Once the two sessions are spawned, we ``boot'' the POE kernel in line
12. This call to run
will not return as long as there are sessions
with pending or potential events, as determined by the POE kernel.
And once we're done, we exit in line 13. (How appropriate.)
There. All done. Oh wait, how does the master session do its job?
That's next, beginning in line 15. I've set up a BEGIN
block from
lines 15 to 75 to define the class, similar to how a use
would
bring it in. Line 16 establishes the package name of this module,
and line 17 defines the constants needed for a POE session.
Lines 19 to 25 are invoked as a class method, as we did in line 11.
The classname gets shifted off in line 20, and the new session is
created by calling POE::Session
's create
method. This has many,
many parameters: we need only two: the ``states'' that we'd like to
register, and some initial values for the ``heap''.
In this case, we'll register _start
, ready
, and done
as our
three interesting events, mapping to subroutines of the same name.
(Events can be mapped to arbitrary subroutines, or even method calls
instead, but we'll keep it simple here.)
The ``heap'' is a set of local named parameters belonging to each
session. By inserting values into the heap as the session is spawned,
we've effectively given named arguments directly into the session. As
an alternate design, we could arrange to pass items into the _start
event instead, but we'd just end up storing those into the heap
anyway, so this skips that copying step. Note that UA
and TODO
come from line 11, but KIDMAX
and KIDS
are defined here. We'll
see how all four of these items are used in a minute.
Lines 27 to 33 define the subroutine for the _start
event. The POE
kernel always sends this event as the session is being spawned. The
parameters for an event handler are given as offsets into the @_
array, defined by a list of names. The value of HEAP
gives us the
proper item of @_
representing the heap hashref, which we copy into
$heap
in line 28.
The element TODO
of the heap gives an arrayref of URLs that are
left to be checked. Initially, it's just the top-level URL from line
8 (via line 11 and line 24). For each item in this list, we'll make
it ``canonical'' by passing it through the make_canonical
subroutine
(defined later), and then create a hash with that key and a value of
1. The hash is used to ensure that we look at a given URL only once.
The hash is kept in the heap hash under a key of DONE
.
Once we have a todo list, and a done hash, it's time to start spawning
the children sessions to check the URLs. We'll do that by a request
to yield
to the ready
event. This instructs the kernel that it
can now call that subroutine the next time it's looking for something
to do. The parameter initial
is passed to the subroutine as one of
the user parameters, strictly for debugging and tracing. Had this
routine not yielded, the kernel would have recognized that there were
no further events, and exited immediately.
The ready
event triggers the same-named subroutine, defined in
lines 35 to 49. Line 36 pulls out the debugging tag (such as
initial
from above) for a tracing message. I've commented this out
in this version, but you can uncomment it while you're playing to see
when ready
gets called, and from where.
The heap and the kernel are pulled out of the subroutine argument list
in lines 37 and 38. Lines 39 and 40 decide if we have any chance to
launch a new checker session, and if we have anything to do in that
case. We can't exceed KIDMAX
number of sessions, and we won't have
anything to do if the TODO
list is empty.
If we make it to line 41, it's time to probe a new URL. Again, the tracing message is commented out. Spawning the new session consists of counting it (line 42), launching the sessions (lines 43 to 47), and then looping back with another event to this same handler. Without the loop, we'd never have multiple checker sessions running.
The checker session is launched similar to the master session, by
calling the class method spawn
, passing it initial values for the
heap. The UA
is simply a copy of the master session's UA heap
variable, while the URL
is the URL to check. The third heap value
is POSTBACK
, a postback coderef. When the child session is
finished, it will invoke this coderef. This coderef will invoke the
done
event, passing it two parameters: an array ref consisting of
all the parameters given during the creation of the postback (here,
the $url
value), and a second array ref giving all of the
parameters given in the postback's invocation. Yes, it's a method
call that returns a coderef, that when invoked, invokes yet a
different subroutine. Powerful and useful, but perhaps a bit of work
to get your head around. Hopefully, the upcoming example will make it
clear.
In fact, let's skip over the description of done
until we see how
it would have been triggered. So, skipping down to lines 77 to 125,
we see the spawn
method defined starting in line 81. Again, we're
expecting two events, called _start
and response
, and these are
both mapped to their same-named subroutines. The heap is created from
the incoming parameters, as defined earlier in lines 44 to 46.
When the _start
event is triggered by the kernel, we'll pull in the
HTTP::Request::Common
module to create the LWP request that fetches
the given URL. As this is a require
, it reads the module only the
first time. The $request
object in line 93 is then ``posted'' to the
user agent session in line 94, using its request
event (the second
element of the method call). When the user agent session is done, it
calls the checker session's response
method. Finally, we include
the LWP request object.
After posting the request to the user agent session, the checker
session has no active events, and will remain idle. The user agent
will fetch the page, and then post a response
event, which triggers
the subroutine starting in line 97. The first two ``user'' arguments to
this process are the request object packet, and the response object
packet. Peering inside those, we can extract the request and response
LWP objects, into $request
and $response
in lines 100 and 101
respectively.
Line 103 defines the ``return value'' from this event, which will be 0 or more links to additionally check.
Starting in line 105, we decide what to do with and report about this
response. If it's a successful fetch (line 105) and the URL was
inside the tree of interest (line 106) and it's an HTML page (line
107), then we need to parse the HTML to pull out any further links
worth checking. Again, leveraging off yet another CPAN module, we'll
grab HTML::SimpleLinkExtor
to do the job. The ``base url'' for
relative links is passed while created the extractor object in line
109. Line 110 looks at the response content for links of interest.
Line 111 grabs those links as a list into @links
, being careful to
grab only HTTP links, and not HTTPS or email or FTP links. Again, a
debugging trace line has been commented out in line 112.
If the page wasn't HTML, we note that in line 114. If it was outside the tree of interest, we end up in line 117. These are the links that we wanted only to verify that they could be fetched, and so we note that.
Finally, if we end up in line 120, we've got a bad link, and we report the error. Some of the reported errors are actually ``redirects'', but if they're always a redirect, maybe we should change our link to point at the redirected page. At that point, I was getting distracted by the main purpose of this example, which is to show how to make POE work, so I'm leaving that as an exercise for the reader.
And now for that postback. The coderef contained in
$_[HEAP]{POSTBACK}
is called, passing it the arrayref pointing to
the 0 or more links to be followed as a result of examining this page.
This postback is like a post
call, but against the session that
created the postback (in this case, the master session), triggering an
event in that session (in this case, ``done''), and passing it two user
parameters.
Let's skip up to done
to see how that works. First, we pull out
the heap, request list, and response list in lines 52 and 53. The
request list is the list created in line 46, which is just the URL.
The response list is the list of links to be visited, created down in
line 123. These are extracted in lines 55 and 56.
For each of the links that we found for this URL, we'll force them to be ``canonical'' (given in a moment), and then add them to the todo list (line 60) unless we've already seen them before (line 61).
Because a postback call would happen only when the child was done (or
nearly done, with only cleanup to do on the way out), we now have one
fewer ``active'' kid, noted in line 64. And since that might mean that
it's time to fire up another URL fetching, we'll again note the
ready
event in line 65, this time annotating it with ``child done''
for the reason.
Now, for that make_canonical
routine, defined in lines 68 to 73.
Given a string representing a URL, we want to clean it up, and remove
any ``fragment identifier'' (the part after the #
). We'll do this
again with a CPAN module (part of LWP).
And that's all there is to it. It's a toy program, but fully functional, and if you run it as-is (with the one modification noted above), you'll discover exactly how many of the DMOZ entries in the Perl section are still valid.
To be practical, we'd probably want a lot more control over deciding whether a URL is to be followed if it contains HTML, or just probed, or ignored, and probably some summary report at the end. But that's not the point of this demonstration, as an introduction to POE.
Listing
=1= #!/usr/bin/perl -w =2= use strict; =3= $|++; =4= =5= use constant POCO_HTTP => "ua"; =6= use POE qw(Component::Client::HTTP); =7= =8= my $TOP = "http://directory.google.comm/Top/Computers/Programming/Languages/Perl/"; =9= =10= POE::Component::Client::HTTP->spawn(Alias => POCO_HTTP, Timeout => 30); =11= POE::Component::My::Master->spawn(UA => POCO_HTTP, TODO => [$TOP]); =12= $poe_kernel->run; =13= exit 0; =14= =15= BEGIN { =16= package POE::Component::My::Master; =17= use POE::Session; # for constants =18= =19= sub spawn { =20= my $class = shift; =21= POE::Session->create =22= (package_states => =23= [$class => [qw(_start ready done)]], =24= heap => {KIDMAX => 10, KIDS => 0, @_}); =25= } =26= =27= sub _start { =28= my $heap = $_[HEAP]; =29= for (@{$heap->{TODO}}) { =30= $heap->{DONE}{$_ = make_canonical($_)} = 1; =31= } =32= $_[KERNEL]->yield("ready", "initial"); =33= } =34= =35= sub ready { =36= ## warn "ready because $_[ARG0]\n"; =37= my $heap = $_[HEAP]; =38= my $kernel = $_[KERNEL]; =39= return if $heap->{KIDS} >= $heap->{KIDMAX}; =40= return unless my $url = shift @{$heap->{TODO}}; =41= ## warn "doing: $url\n"; =42= $heap->{KIDS}++; =43= POE::Component::My::Checker->spawn =44= (UA => $heap->{UA}, =45= URL => $url, =46= POSTBACK => $_[SESSION]->postback("done", $url), =47= ); =48= $kernel->yield("ready", "looping"); =49= } =50= =51= sub done { =52= my $heap = $_[HEAP]; =53= my ($request,$response) = @_[ARG0,ARG1]; =54= =55= my ($url) = @$request; =56= my @links = @{$response->[0]}; =57= =58= for (@links) { =59= $_ = make_canonical($_); =60= push @{$heap->{TODO}}, $_ =61= unless $heap->{DONE}{$_}++; =62= } =63= =64= $heap->{KIDS}--; =65= $_[KERNEL]->yield("ready", "child done"); =66= } =67= =68= sub make_canonical { # not a POE =69= require URI; =70= my $uri = URI->new(shift); =71= $uri->fragment(undef); # toss fragment =72= $uri->canonical->as_string; # return value =73= } =74= =75= } # end POE::Component::My::Master =76= =77= BEGIN { =78= package POE::Component::My::Checker; =79= use POE::Session; =80= =81= sub spawn { =82= my $class = shift; =83= POE::Session->create =84= (package_states => =85= [$class => [qw(_start response)]], =86= heap => {@_}); =87= } =88= =89= sub _start { =90= require HTTP::Request::Common; =91= my $heap = $_[HEAP]; =92= my $url = $heap->{URL}; =93= my $request = HTTP::Request::Common::GET($url); =94= $_[KERNEL]->post($heap->{UA}, 'request', 'response', $request); =95= } =96= =97= sub response { =98= my $url = $_[HEAP]{URL}; =99= my ($request_packet, $response_packet) = @_[ARG0, ARG1]; =100= my ($request, $request_tag) = @$request_packet; =101= my ($response) = @$response_packet; =102= =103= my @links; =104= =105= if ($response->is_success) { =106= if ($response->base =~ m{^\Q$TOP}) { =107= if ($response->content_type eq "text/html") { =108= require HTML::SimpleLinkExtor; =109= my $e = HTML::SimpleLinkExtor->new($response->base); =110= $e->parse($response->content); =111= @links = grep m{^http:}, $e->links; =112= ## warn "parsed: $url\n"; =113= } else { =114= ## warn "not HTML: $url\n"; =115= } =116= } else { =117= warn "valid: $url\n"; =118= } =119= } else { =120= warn "BAD (", $response->code, "): $url\n"; =121= } =122= =123= $_[HEAP]{POSTBACK}(\@links); =124= } =125= } # end POE::Component::My::Checker