Copyright Notice
This text is copyright by InfoStrada Communications, Inc., and is used with their permission. Further distribution or use is not permitted.This text has appeared in an edited form in Linux Magazine magazine. However, the version you are reading here is as the author originally submitted the article for publication, not after their editors applied their creativity.
Please read all the information in the table of contents before using this article.
![]() |
Download this listing! | ![]() |
![]() |
![]() |
Linux Magazine Column 15 (Aug 2000)
[Suggested title: Getting your kids to do the work]
Have you ever gone out into the workshop to make something interesting, only to find that the workbench you want to use is too short or long or not high enough or doesn't have clamps in the right places or maybe it's just too uneven? So then you sit down and spend some time first creating a good workbench, in hopes that this will support (literally) your work in creating the thing you had started out to make.
Well, that's what happened to me the other day, in my virtual workshop. I started out intending to make this month's column about a parallel link checker that verifies that my website has good links, both internally and externally. And I wanted to try something new, since this topic has been beaten to death both by me and by other authors.
The new angle I wanted was to use forked kids to do the lookup and processing for each page, which a parent process could then manage (a bit like herding cats, but close enough). The parent would try to keep all the kids busy, and coordinate the results in the event that a kid got distracted. Also, in the case of link verification, each kid response was likely to point out other links to verify, so some way of coordinating the whole shebang was also needed in the parent.
So, I rummaged around for the right event loop or RPC package. I
figured this was already a frequently-solved problem, so there was no
point in reinventing the wheel. I investigated the Event
package,
POE
, most of the stuff in the RPC::
and IPC::
sections of the
CPAN, and was astounded to find that nobody had seemed to tackle this
particular problem in a way that I wanted it done. Or at least, in a
way that I could easily find it.
I even dropped onto my favorite IRC channel, #perl
on EFnet, and
got a bunch of ``oh that's simple, you just do X'', but when I
challenged them to write the ``15 lines'' of code they claimed, I found
that they had not addressed one or more of the many important issues.
For example, each kid will take a varying length of time, so you can't
just deal them all evenly. You have to give each new kid the next
task as it finishes the previous one. And kids might go away or get
hung, so you need to resubmit that task to another kid after some
timeout. Also, the parent needs to see the results in near-realtime so
that new requests can be generated from old responses rapidly. Also,
the data going to and from the kids has to be arbitrary sized, and be
compatible with one of the marshallers like Storable
.
So, I set out to write my own kid task manager, and the result, including a test harness, is [listing one, below]. I'm quite pleased with the result, but I've only built the workbench so far. Next month, I'll build the link checker using this code, so stay tuned. (As the code is fairly long, I won't be doing my usual line-by-line commentary, but instead will hit the important points.)
Lines 5 through 17 set up wrappers around warn
and die
functions. Because we fork, we'll want to know which process (the
parent or one of the kids) sent the message, so we'll add it. And as
long as we're hacking the message, a little timestamp prefix won't
hurt either.
Lines 19 to 73 set up the test harness for the code, which really
begins at line 77. The test starts with a single task to be executed,
numbered 1, and named task00001
. (Each task name must be unique.)
The task data consists of the task number and a ``payload'' of varying
lengths. The task name, task number, and payload are received by the
child as part of the remote procedure call. The child returns back
the unmodified payload along with two new task numbers equal to the
original task number times 2, and one more than that. So task 1
triggers tasks 2 and 3. Task 2 triggers tasks 4 and 5. Task 3
triggers tasks 6 and 7. But in all likelhood, tasks 2 and 3 will be
executing in parallel, as will 4, 5, 6, and 7. So the tasks to be
processed will increase exponentially until the $MAX
value is
reached, defined in line 25.
Lines 19 to 22 generate a specific task, calling the add_task
routine defined in the kid task manager. The first parameter is the
task name, and must be a unique string. The remaining parameters are
passed to Storable
, and can be any permitted data structure.
Once the first task is queued (in line 24), the main running loop is
started, using the rather largish run_queue
invocation in lines 29
to 69. The parameters passed are in ``key/value'' ordering, starting
with a commented-out Trace
value of 1 (in line 30). Uncommenting
this line causes a lot of extra tracing of child process launches and
terminations that was helpful during the debugging.
Next, in line 31, we've got the KidMax
value. This is the number
of parallel processes to execute. If the processes are mostly in I/O
wait (like DNS lookup or waiting for some remote web page to trigger),
we can set this fairly high and still not be a bad neighbor to the
other processes on the box. Experimentation is the key here. I'll
probably have a lot more to say about this number when I start running
my web link checker using this code.
Lines 32 to 48 define the activity of each kid once it gets a task.
The value is a coderef, here defined by an anonymous subroutine. The
incoming parameters are the unmarshalled values that had been handed
to add_task
, item for item. It's important to remember here that
the kid doesn't have access to the parent's memory space, although any
global data or subroutines set up prior to the beginning of
run_queue
is correctly copied along to the kids. For the test
harness, we've got three tweakable simulator items: a cpu waster
(lines 35 to 38), a time waster (lines 39 to 41), and a flakiness
generator (lines 42 to 44). These can be disabled or enabled by
inserting or removing a 0 and
in front of the description strings,
like I've done to disable the CPU waster.
Finally, lines 46 and 47 handle the pseudo-task. The incoming $n
is returned as 2 * $n
and 2 * $n + 1
, while the payload string
is passed through unchanged. Not rocket science here, because I was
more interested in ensuring that large data would be correctly handed
to and from the kids.
Lines 49 to 68 define the result task back in the parent, again as an
anonymous subroutine (actually a closure at this point, because it
refers to %results
and $MAX
from the outer scope. The incoming
parameters in line 50 are the task name and any list returned by the
child. Again, this response list has gone through a marshalling step,
but Storable
does a nice job here. Lines 52 to 59 take care of
queuing up the new task provided this is the first response we've
gotten on this task number. Why might we get multiple responses?
Well, I've set up the kid manager to resubmit any tasks for which we
haven't seen a response yet, and there are times when the same task
will be requeued to ensure completion. So, we also get the occasional
duplicates when two active kids were given the same task. Oh well.
Lines 61 to 67 dump out a response in a controlled way, especially
when there's a huge payload, by truncating any long strings similar to
the output of the DBI
module when tracing interactions with the
database.
And finally, to check the duplication or absence of task completion, lines 71 through 73 display how many times each task result was seen.
So, that's the test harness; now let's see how the work gets done.
Lines 77 to 80 bring in some modules. The first three are from the
standard distribution, and the fourth is from the CPAN. Lines 82
through 109 define the subroutines that are ``task manager'' related,
needed access to both the %tasks
hash, containing the tasks to be
sent to the kids, and a @queue
cache of task keys, to deal out the
tasks fairly. The only subroutine here meant to be called outside the
package is add_task
, which adds an entry to the task hash.
Lines 100 to 108 define the next_task
subroutine, which deals out
the tasks in the order specified by the @queue
. Initially, the
queue is empty, so a sorted list of keys serves to initializes the
queue. Items from the queue are removed one by one, and the queue is
refreshed to the current list of keys if emptied. This is a
simplistic model, and leads to duplication of task queuing, but was
the easiest thing I could think of without having to keep track of
timeouts for all the tasks, getting into all sorts of messiness.
Lines 111 to 219 define the kid manager. Lines 112 through 116
provide the ``regional'' (between global and local) variables for these
subroutines. %kids
is a simple hash keyed by process ID, with a
value of an arrayref containing the send and receive IO handles. The
next four variables ($kid_max
, $kid_task
, $result_task
, and
$trace
) hold the values given to run_queue
by the caller. Note
that two of these have a default value, but the others are expected to
be given at the invocation.
Speaking of run_queue
, that's the meat of the package here, defined
in lines 118 to 173. The first eight lines grab the parameters and
save them away for later use. The main loop starting in line 129
handles the various phases of the event processing sequence. First,
any dead kids are harvested in lines 132 to 135. Second, any kids
that don't show up on the radar are forgotten about in lines 137 to
141 (this can happen only if something in the ResultTask
also
forks and/or waits for kids). Third, any tasks that are waiting for a
new kid to be forked can be shipped out in lines 143 to 147.
Finally, any ready responses are detected in the bottom part of the
loop, beginning in line 149. To achieve this, we set up a
IO::Select
object with all the read handles for the kids, and wait
for up to 1 second for any of them to become ready. For each one that
is, we read the length word for the response (the first 4 bytes) and
then the response itself. On a good thawing (line 157), the task is removed
as a good complete task, and the ResultTask
is called.
As we now have an idle kid, we decide whether to feed it (line 161) or kill it (line 163). Can't have them just sitting around, ya know. Lines 168 and 169 get triggered only when the kid didn't say the right thing, meaning we've gotten out of sync or something serious has happened. And line 171 keeps the loop running as long as we have tasks or kids to manage.
Lines 175 to 191 fire off a kid, via a fork with pipes set up for each end. Even though a pipe can block if full, our algorithm ensures that when reading or writing the pipe, it's OK if we block, because it'll be for only a very short time. (That was one of the cool things I came up with when coding this particular code chunk.) The return value is the process ID of the kid that just got created. If forking fails in line 178, the parent will simply make do with the kids it has, reusing the existing kids, and retrying the fork on the next loop automatically (cool).
Lines 193 handle the parent-to-kid transmission, simply by wrapping a
print
inside a block that ignores SIGPIPE
. If a kid dies before
the message is fully delivered, we don't really care, because we'll
see the dead kid on the next loop, and requeue his task automatically
when we're done with a sweep of the task queue. Simplicity wins here.
Finally, lines 202 through 218 give the main kid loop. Repeatedly,
the kid is waiting for a 4-byte length, followed by a
Storable
-encoded list. The list is decoded and handed to the
KidTask
subroutine, and the results are encoded and returned in a
similar style.
Wow. Long pile of code. But useful, and fun, and efficient. Go ahead and play with this for now, but then remember, this is just the workbench. Next month comes the real project! Until then, enjoy.
Listings
=1= #!/usr/bin/perl -w =2= use strict; =3= $|++; =4= =5= $SIG{__WARN__} = sub { =6= my $message = shift; =7= my $prefix = "[$$] [".localtime()."] "; =8= $message =~ s/^/$prefix/gm; =9= warn $message; =10= }; =11= =12= $SIG{__DIE__} = sub { =13= my $message = shift; =14= my $prefix = "[$$] [".localtime()."] "; =15= $message =~ s/^/$prefix/gm; =16= die $message; =17= }; =18= =19= sub generate_task_number_n { =20= my $n = shift; =21= add_task(sprintf("task%05d", $n), $n, "xxxxxxxxxxxx" x $n); =22= } =23= =24= generate_task_number_n(1); =25= my $MAX = 2048; =26= =27= my %results; =28= =29= run_queue( =30= ## Trace => 1, =31= KidMax => 15, =32= KidTask => sub { =33= my($key, @values) = @_; =34= =35= if (0 and "simulate child burn") { =36= my $target_cpu = (times)[0] + 0.1 + rand 0.2; =37= 1 while (times)[0] < $target_cpu; =38= } =39= if ("simulate child delay") { =40= select(undef, undef, undef, 0.1 + rand 0.3); =41= } =42= if ("simulate child abort") { =43= die "simulating child abort" unless rand > 0.05; =44= } =45= =46= my($n, @payload) = @values; =47= return (2 * $n, 2 * $n + 1, @payload); =48= }, =49= ResultTask => sub { =50= my($key, @responses) = @_; =51= =52= my($new_2n, $new_2n_plus_1) = @responses; =53= if ($results{$key}++) { =54= print "DUPLICATE "; =55= } else { =56= for ($new_2n, $new_2n_plus_1) { =57= generate_task_number_n($_) if $_ < $MAX; =58= } =59= } =60= =61= print =62= "RESULT for $key => ", =63= join(", ", =64= map { (length > 32) =65= ? substr($_, 0, 29)."... + ".(length() - 29) =66= : $_ } =67= @responses), "\n"; =68= }, =69= ); =70= =71= for (sort keys %results) { =72= print "$_ => $results{$_}\n"; =73= } =74= =75= ### forking task manager from here down =76= =77= use IO::Select; =78= use IO::Pipe; =79= use POSIX qw(WNOHANG); =80= use Storable qw(freeze thaw); =81= =82= BEGIN { # task manager =83= my %tasks; =84= =85= my @queue; =86= =87= sub add_task { ## external =88= my $key = shift; =89= $tasks{$key} = [@_]; =90= } =91= =92= sub remove_task { =93= delete $tasks{+shift}; =94= } =95= =96= sub task_count { =97= scalar keys %tasks; =98= } =99= =100= sub next_task { =101= return undef unless task_count() > 0; =102= { =103= @queue = sort keys %tasks unless @queue; =104= my $key = shift @queue; =105= redo unless exists $tasks{$key}; # might have disappeared =106= freeze([$key, @{$tasks{$key}}]); =107= } =108= } =109= } =110= =111= BEGIN { # kid manager =112= my %kids; =113= my $kid_max = 5; =114= my $kid_task; =115= my $result_task; =116= my $trace = 0; =117= =118= sub run_queue { ## external =119= { =120= my %parms = @_; =121= $kid_max = delete $parms{KidMax} if exists $parms{KidMax}; =122= $kid_task = delete $parms{KidTask} if exists $parms{KidTask}; =123= $result_task = delete $parms{ResultTask} if exists $parms{ResultTask}; =124= $trace = delete $parms{Trace} if exists $parms{Trace}; =125= die "unknown parameters for run_queue: ", join " ", keys %parms =126= if keys %parms; =127= } =128= =129= { =130= warn "to go: ", task_count() if $trace; =131= ## reap kids =132= while ((my $kid = waitpid(-1, WNOHANG)) > 0) { =133= warn "$kid reaped" if $trace; =134= delete $kids{$kid}; =135= } =136= ## verify live kids =137= for my $kid (keys %kids) { =138= next if kill 0, $kid; =139= warn "*** $kid found missing ***"; # shouldn't happen normally =140= delete $kids{$kid}; =141= } =142= ## launch kids =143= if (task_count() > keys %kids and =144= keys %kids < $kid_max and =145= my $kid = create_kid()) { =146= send_to_kid($kid, next_task()); =147= } =148= ## see if any ready results =149= READY: =150= for my $ready (IO::Select->new(map $_->[1], values %kids)->can_read(1)) { =151= ## gotta brute force this, grr, good thing data is small... =152= my ($kid) = grep $kids{$_}[1] == $ready, keys %kids; =153= { =154= last unless read($ready, my $length, 4) == 4; =155= $length = unpack "L", $length; =156= last unless read($ready, my $message, $length) == $length; =157= $message = thaw($message) or die "Cannot thaw"; =158= remove_task($message->[0]); =159= $result_task->(@$message); =160= if (task_count() >= keys %kids) { =161= send_to_kid($kid, next_task()); =162= } else { # close it down =163= $kids{$kid}[0]->close; =164= } =165= next READY; =166= } =167= ## something broken with this kid... =168= kill 15, $kid; =169= delete $kids{$kid}; # forget about it =170= } =171= redo if %kids or task_count(); =172= } =173= } =174= =175= sub create_kid { =176= my $to_kid = IO::Pipe->new; =177= my $from_kid = IO::Pipe->new; =178= defined (my $kid = fork) or return; # if can't fork, try to make do =179= unless ($kid) { # I'm the kid =180= $to_kid->reader; =181= $from_kid->writer; =182= $from_kid->autoflush(1); =183= do_kid($to_kid, $from_kid); =184= exit 0; # should not be reached =185= } =186= $from_kid->reader; =187= $to_kid->writer; =188= $to_kid->autoflush(1); =189= $kids{$kid} = [$to_kid, $from_kid]; =190= $kid; =191= } =192= =193= sub send_to_kid { =194= my ($kid, $message) = @_; =195= { =196= ## if we get a SIGPIPE here, no biggy, we'll requeue request later =197= local $SIG{PIPE} = 'IGNORE'; =198= print { $kids{$kid}[0] } pack("L", length($message)), $message; =199= } =200= } =201= =202= sub do_kid { =203= my($input, $output) = @_; =204= warn "kid launched" if $trace; =205= { =206= last unless read($input, my $length, 4) == 4; =207= $length = unpack "L", $length; =208= last unless read($input, my $message, $length) == $length; =209= $message = thaw($message) or die "Cannot thaw"; =210= my ($key, @values) = @$message; =211= my @results = $kid_task->($key, @values); =212= $message = freeze([$key, @results]); =213= print $output pack("L", length($message)), $message; =214= redo; =215= } =216= warn "kid ending" if $trace; =217= exit 0; =218= } =219= }