Copyright Notice

This text is copyright by InfoStrada Communications, Inc., and is used with their permission. Further distribution or use is not permitted.

This text has appeared in an edited form in Linux Magazine magazine. However, the version you are reading here is as the author originally submitted the article for publication, not after their editors applied their creativity.

Please read all the information in the table of contents before using this article.
Download this listing!

Linux Magazine Column 15 (Aug 2000)

[Suggested title: Getting your kids to do the work]

Have you ever gone out into the workshop to make something interesting, only to find that the workbench you want to use is too short or long or not high enough or doesn't have clamps in the right places or maybe it's just too uneven? So then you sit down and spend some time first creating a good workbench, in hopes that this will support (literally) your work in creating the thing you had started out to make.

Well, that's what happened to me the other day, in my virtual workshop. I started out intending to make this month's column about a parallel link checker that verifies that my website has good links, both internally and externally. And I wanted to try something new, since this topic has been beaten to death both by me and by other authors.

The new angle I wanted was to use forked kids to do the lookup and processing for each page, which a parent process could then manage (a bit like herding cats, but close enough). The parent would try to keep all the kids busy, and coordinate the results in the event that a kid got distracted. Also, in the case of link verification, each kid response was likely to point out other links to verify, so some way of coordinating the whole shebang was also needed in the parent.

So, I rummaged around for the right event loop or RPC package. I figured this was already a frequently-solved problem, so there was no point in reinventing the wheel. I investigated the Event package, POE, most of the stuff in the RPC:: and IPC:: sections of the CPAN, and was astounded to find that nobody had seemed to tackle this particular problem in a way that I wanted it done. Or at least, in a way that I could easily find it.

I even dropped onto my favorite IRC channel, #perl on EFnet, and got a bunch of ``oh that's simple, you just do X'', but when I challenged them to write the ``15 lines'' of code they claimed, I found that they had not addressed one or more of the many important issues.

For example, each kid will take a varying length of time, so you can't just deal them all evenly. You have to give each new kid the next task as it finishes the previous one. And kids might go away or get hung, so you need to resubmit that task to another kid after some timeout. Also, the parent needs to see the results in near-realtime so that new requests can be generated from old responses rapidly. Also, the data going to and from the kids has to be arbitrary sized, and be compatible with one of the marshallers like Storable.

So, I set out to write my own kid task manager, and the result, including a test harness, is [listing one, below]. I'm quite pleased with the result, but I've only built the workbench so far. Next month, I'll build the link checker using this code, so stay tuned. (As the code is fairly long, I won't be doing my usual line-by-line commentary, but instead will hit the important points.)

Lines 5 through 17 set up wrappers around warn and die functions. Because we fork, we'll want to know which process (the parent or one of the kids) sent the message, so we'll add it. And as long as we're hacking the message, a little timestamp prefix won't hurt either.

Lines 19 to 73 set up the test harness for the code, which really begins at line 77. The test starts with a single task to be executed, numbered 1, and named task00001. (Each task name must be unique.)

The task data consists of the task number and a ``payload'' of varying lengths. The task name, task number, and payload are received by the child as part of the remote procedure call. The child returns back the unmodified payload along with two new task numbers equal to the original task number times 2, and one more than that. So task 1 triggers tasks 2 and 3. Task 2 triggers tasks 4 and 5. Task 3 triggers tasks 6 and 7. But in all likelhood, tasks 2 and 3 will be executing in parallel, as will 4, 5, 6, and 7. So the tasks to be processed will increase exponentially until the $MAX value is reached, defined in line 25.

Lines 19 to 22 generate a specific task, calling the add_task routine defined in the kid task manager. The first parameter is the task name, and must be a unique string. The remaining parameters are passed to Storable, and can be any permitted data structure.

Once the first task is queued (in line 24), the main running loop is started, using the rather largish run_queue invocation in lines 29 to 69. The parameters passed are in ``key/value'' ordering, starting with a commented-out Trace value of 1 (in line 30). Uncommenting this line causes a lot of extra tracing of child process launches and terminations that was helpful during the debugging.

Next, in line 31, we've got the KidMax value. This is the number of parallel processes to execute. If the processes are mostly in I/O wait (like DNS lookup or waiting for some remote web page to trigger), we can set this fairly high and still not be a bad neighbor to the other processes on the box. Experimentation is the key here. I'll probably have a lot more to say about this number when I start running my web link checker using this code.

Lines 32 to 48 define the activity of each kid once it gets a task. The value is a coderef, here defined by an anonymous subroutine. The incoming parameters are the unmarshalled values that had been handed to add_task, item for item. It's important to remember here that the kid doesn't have access to the parent's memory space, although any global data or subroutines set up prior to the beginning of run_queue is correctly copied along to the kids. For the test harness, we've got three tweakable simulator items: a cpu waster (lines 35 to 38), a time waster (lines 39 to 41), and a flakiness generator (lines 42 to 44). These can be disabled or enabled by inserting or removing a 0 and in front of the description strings, like I've done to disable the CPU waster.

Finally, lines 46 and 47 handle the pseudo-task. The incoming $n is returned as 2 * $n and 2 * $n + 1, while the payload string is passed through unchanged. Not rocket science here, because I was more interested in ensuring that large data would be correctly handed to and from the kids.

Lines 49 to 68 define the result task back in the parent, again as an anonymous subroutine (actually a closure at this point, because it refers to %results and $MAX from the outer scope. The incoming parameters in line 50 are the task name and any list returned by the child. Again, this response list has gone through a marshalling step, but Storable does a nice job here. Lines 52 to 59 take care of queuing up the new task provided this is the first response we've gotten on this task number. Why might we get multiple responses? Well, I've set up the kid manager to resubmit any tasks for which we haven't seen a response yet, and there are times when the same task will be requeued to ensure completion. So, we also get the occasional duplicates when two active kids were given the same task. Oh well.

Lines 61 to 67 dump out a response in a controlled way, especially when there's a huge payload, by truncating any long strings similar to the output of the DBI module when tracing interactions with the database.

And finally, to check the duplication or absence of task completion, lines 71 through 73 display how many times each task result was seen.

So, that's the test harness; now let's see how the work gets done. Lines 77 to 80 bring in some modules. The first three are from the standard distribution, and the fourth is from the CPAN. Lines 82 through 109 define the subroutines that are ``task manager'' related, needed access to both the %tasks hash, containing the tasks to be sent to the kids, and a @queue cache of task keys, to deal out the tasks fairly. The only subroutine here meant to be called outside the package is add_task, which adds an entry to the task hash.

Lines 100 to 108 define the next_task subroutine, which deals out the tasks in the order specified by the @queue. Initially, the queue is empty, so a sorted list of keys serves to initializes the queue. Items from the queue are removed one by one, and the queue is refreshed to the current list of keys if emptied. This is a simplistic model, and leads to duplication of task queuing, but was the easiest thing I could think of without having to keep track of timeouts for all the tasks, getting into all sorts of messiness.

Lines 111 to 219 define the kid manager. Lines 112 through 116 provide the ``regional'' (between global and local) variables for these subroutines. %kids is a simple hash keyed by process ID, with a value of an arrayref containing the send and receive IO handles. The next four variables ($kid_max, $kid_task, $result_task, and $trace) hold the values given to run_queue by the caller. Note that two of these have a default value, but the others are expected to be given at the invocation.

Speaking of run_queue, that's the meat of the package here, defined in lines 118 to 173. The first eight lines grab the parameters and save them away for later use. The main loop starting in line 129 handles the various phases of the event processing sequence. First, any dead kids are harvested in lines 132 to 135. Second, any kids that don't show up on the radar are forgotten about in lines 137 to 141 (this can happen only if something in the ResultTask also forks and/or waits for kids). Third, any tasks that are waiting for a new kid to be forked can be shipped out in lines 143 to 147.

Finally, any ready responses are detected in the bottom part of the loop, beginning in line 149. To achieve this, we set up a IO::Select object with all the read handles for the kids, and wait for up to 1 second for any of them to become ready. For each one that is, we read the length word for the response (the first 4 bytes) and then the response itself. On a good thawing (line 157), the task is removed as a good complete task, and the ResultTask is called.

As we now have an idle kid, we decide whether to feed it (line 161) or kill it (line 163). Can't have them just sitting around, ya know. Lines 168 and 169 get triggered only when the kid didn't say the right thing, meaning we've gotten out of sync or something serious has happened. And line 171 keeps the loop running as long as we have tasks or kids to manage.

Lines 175 to 191 fire off a kid, via a fork with pipes set up for each end. Even though a pipe can block if full, our algorithm ensures that when reading or writing the pipe, it's OK if we block, because it'll be for only a very short time. (That was one of the cool things I came up with when coding this particular code chunk.) The return value is the process ID of the kid that just got created. If forking fails in line 178, the parent will simply make do with the kids it has, reusing the existing kids, and retrying the fork on the next loop automatically (cool).

Lines 193 handle the parent-to-kid transmission, simply by wrapping a print inside a block that ignores SIGPIPE. If a kid dies before the message is fully delivered, we don't really care, because we'll see the dead kid on the next loop, and requeue his task automatically when we're done with a sweep of the task queue. Simplicity wins here.

Finally, lines 202 through 218 give the main kid loop. Repeatedly, the kid is waiting for a 4-byte length, followed by a Storable-encoded list. The list is decoded and handed to the KidTask subroutine, and the results are encoded and returned in a similar style.

Wow. Long pile of code. But useful, and fun, and efficient. Go ahead and play with this for now, but then remember, this is just the workbench. Next month comes the real project! Until then, enjoy.

Listings

        =1=     #!/usr/bin/perl -w
        =2=     use strict;
        =3=     $|++;
        =4=     
        =5=     $SIG{__WARN__} = sub {
        =6=       my $message = shift;
        =7=       my $prefix = "[$$] [".localtime()."] ";
        =8=       $message =~ s/^/$prefix/gm;
        =9=       warn $message;
        =10=    };
        =11=    
        =12=    $SIG{__DIE__} = sub {
        =13=      my $message = shift;
        =14=      my $prefix = "[$$] [".localtime()."] ";
        =15=      $message =~ s/^/$prefix/gm;
        =16=      die $message;
        =17=    };
        =18=    
        =19=    sub generate_task_number_n {
        =20=      my $n = shift;
        =21=      add_task(sprintf("task%05d", $n), $n, "xxxxxxxxxxxx" x $n);
        =22=    }
        =23=    
        =24=    generate_task_number_n(1);
        =25=    my $MAX = 2048;
        =26=    
        =27=    my %results;
        =28=    
        =29=    run_queue(
        =30=              ## Trace => 1,
        =31=              KidMax => 15,
        =32=              KidTask => sub {
        =33=                my($key, @values) = @_;
        =34=    
        =35=                if (0 and "simulate child burn") {
        =36=                  my $target_cpu = (times)[0] + 0.1 + rand 0.2;
        =37=                  1 while (times)[0] < $target_cpu;
        =38=                }
        =39=                if ("simulate child delay") {
        =40=                  select(undef, undef, undef, 0.1 + rand 0.3);
        =41=                }
        =42=                if ("simulate child abort") {
        =43=                  die "simulating child abort" unless rand > 0.05;
        =44=                }
        =45=    
        =46=                my($n, @payload) = @values;
        =47=                return (2 * $n, 2 * $n + 1, @payload);
        =48=              },
        =49=              ResultTask => sub {
        =50=                my($key, @responses) = @_;
        =51=    
        =52=                my($new_2n, $new_2n_plus_1) = @responses;
        =53=                if ($results{$key}++) {
        =54=                  print "DUPLICATE ";
        =55=                } else {
        =56=                  for ($new_2n, $new_2n_plus_1) {
        =57=                    generate_task_number_n($_) if $_ < $MAX;
        =58=                  }
        =59=                }
        =60=    
        =61=                print
        =62=                  "RESULT for $key => ",
        =63=                    join(", ",
        =64=                         map { (length > 32)
        =65=                                 ? substr($_, 0, 29)."... + ".(length() - 29)
        =66=                                   : $_ }
        =67=                         @responses), "\n";
        =68=              },
        =69=             );
        =70=    
        =71=    for (sort keys %results) {
        =72=      print "$_ => $results{$_}\n";
        =73=    }
        =74=    
        =75=    ### forking task manager from here down
        =76=    
        =77=    use IO::Select;
        =78=    use IO::Pipe;
        =79=    use POSIX qw(WNOHANG);
        =80=    use Storable qw(freeze thaw);
        =81=    
        =82=    BEGIN {                         # task manager
        =83=      my %tasks;
        =84=    
        =85=      my @queue;
        =86=    
        =87=      sub add_task { ## external
        =88=        my $key = shift;
        =89=        $tasks{$key} = [@_];
        =90=      }
        =91=    
        =92=      sub remove_task {
        =93=        delete $tasks{+shift};
        =94=      }
        =95=    
        =96=      sub task_count {
        =97=        scalar keys %tasks;
        =98=      }
        =99=    
        =100=     sub next_task {
        =101=       return undef unless task_count() > 0;
        =102=       {
        =103=         @queue = sort keys %tasks unless @queue;
        =104=         my $key = shift @queue;
        =105=         redo unless exists $tasks{$key}; # might have disappeared
        =106=         freeze([$key, @{$tasks{$key}}]);
        =107=       }
        =108=     }
        =109=   }
        =110=   
        =111=   BEGIN {                         # kid manager
        =112=     my %kids;
        =113=     my $kid_max = 5;
        =114=     my $kid_task;
        =115=     my $result_task;
        =116=     my $trace = 0;
        =117=   
        =118=     sub run_queue { ## external
        =119=       {
        =120=         my %parms = @_;
        =121=         $kid_max = delete $parms{KidMax} if exists $parms{KidMax};
        =122=         $kid_task = delete $parms{KidTask} if exists $parms{KidTask};
        =123=         $result_task = delete $parms{ResultTask} if exists $parms{ResultTask};
        =124=         $trace = delete $parms{Trace} if exists $parms{Trace};
        =125=         die "unknown parameters for run_queue: ", join " ", keys %parms
        =126=           if keys %parms;
        =127=       }
        =128=   
        =129=       {
        =130=         warn "to go: ", task_count() if $trace;
        =131=         ## reap kids
        =132=         while ((my $kid = waitpid(-1, WNOHANG)) > 0) {
        =133=           warn "$kid reaped" if $trace;
        =134=           delete $kids{$kid};
        =135=         }
        =136=         ## verify live kids
        =137=         for my $kid (keys %kids) {
        =138=           next if kill 0, $kid;
        =139=           warn "*** $kid found missing ***"; # shouldn't happen normally
        =140=           delete $kids{$kid};
        =141=         }
        =142=         ## launch kids
        =143=         if (task_count() > keys %kids and
        =144=             keys %kids < $kid_max and
        =145=             my $kid = create_kid()) {
        =146=           send_to_kid($kid, next_task());
        =147=         }
        =148=         ## see if any ready results
        =149=       READY:
        =150=         for my $ready (IO::Select->new(map $_->[1], values %kids)->can_read(1)) {
        =151=           ## gotta brute force this, grr, good thing data is small...
        =152=           my ($kid) = grep $kids{$_}[1] == $ready, keys %kids;
        =153=           {
        =154=             last unless read($ready, my $length, 4) == 4;
        =155=             $length = unpack "L", $length;
        =156=             last unless read($ready, my $message, $length) == $length;
        =157=             $message = thaw($message) or die "Cannot thaw";
        =158=             remove_task($message->[0]);
        =159=             $result_task->(@$message);
        =160=             if (task_count() >= keys %kids) {
        =161=               send_to_kid($kid, next_task());
        =162=             } else {              # close it down
        =163=               $kids{$kid}[0]->close;
        =164=             }
        =165=             next READY;
        =166=           }
        =167=           ## something broken with this kid...
        =168=           kill 15, $kid;
        =169=           delete $kids{$kid};     # forget about it
        =170=         }
        =171=         redo if %kids or task_count();
        =172=       }
        =173=     }
        =174=   
        =175=     sub create_kid {
        =176=       my $to_kid = IO::Pipe->new;
        =177=       my $from_kid = IO::Pipe->new;
        =178=       defined (my $kid = fork) or return; # if can't fork, try to make do
        =179=       unless ($kid) {             # I'm the kid
        =180=         $to_kid->reader;
        =181=         $from_kid->writer;
        =182=         $from_kid->autoflush(1);
        =183=         do_kid($to_kid, $from_kid);
        =184=         exit 0;                   # should not be reached
        =185=       }
        =186=       $from_kid->reader;
        =187=       $to_kid->writer;
        =188=       $to_kid->autoflush(1);
        =189=       $kids{$kid} = [$to_kid, $from_kid];
        =190=       $kid;
        =191=     }
        =192=   
        =193=     sub send_to_kid {
        =194=       my ($kid, $message) = @_;
        =195=       {
        =196=         ## if we get a SIGPIPE here, no biggy, we'll requeue request later
        =197=         local $SIG{PIPE} = 'IGNORE';
        =198=         print { $kids{$kid}[0] } pack("L", length($message)), $message;
        =199=       }
        =200=     }
        =201=   
        =202=     sub do_kid {
        =203=       my($input, $output) = @_;
        =204=       warn "kid launched" if $trace;
        =205=       {
        =206=         last unless read($input, my $length, 4) == 4;
        =207=         $length = unpack "L", $length;
        =208=         last unless read($input, my $message, $length) == $length;
        =209=         $message = thaw($message) or die "Cannot thaw";
        =210=         my ($key, @values) = @$message;
        =211=         my @results = $kid_task->($key, @values);
        =212=         $message = freeze([$key, @results]);
        =213=         print $output pack("L", length($message)), $message;
        =214=         redo;
        =215=       }
        =216=       warn "kid ending" if $trace;
        =217=       exit 0;
        =218=     }
        =219=   }

Randal L. Schwartz is a renowned expert on the Perl programming language (the lifeblood of the Internet), having contributed to a dozen top-selling books on the subject, and over 200 magazine articles. Schwartz runs a Perl training and consulting company (Stonehenge Consulting Services, Inc of Portland, Oregon), and is a highly sought-after speaker for his masterful stage combination of technical skill, comedic timing, and crowd rapport. And he's a pretty good Karaoke singer, winning contests regularly.

Schwartz can be reached for comment at merlyn@stonehenge.com or +1 503 777-0095, and welcomes questions on Perl and other related topics.