#!/usr/bin/perl -w use strict; $|++; $SIG{__WARN__} = sub { my $message = shift; my $prefix = "[$$] [".localtime()."] "; $message =~ s/^/$prefix/gm; warn $message; }; $SIG{__DIE__} = sub { my $message = shift; my $prefix = "[$$] [".localtime()."] "; $message =~ s/^/$prefix/gm; die $message; }; sub generate_task_number_n { my $n = shift; add_task(sprintf("task%05d", $n), $n, "xxxxxxxxxxxx" x $n); } generate_task_number_n(1); my $MAX = 2048; my %results; run_queue( ## Trace => 1, KidMax => 15, KidTask => sub { my($key, @values) = @_; if (0 and "simulate child burn") { my $target_cpu = (times)[0] + 0.1 + rand 0.2; 1 while (times)[0] < $target_cpu; } if ("simulate child delay") { select(undef, undef, undef, 0.1 + rand 0.3); } if ("simulate child abort") { die "simulating child abort" unless rand > 0.05; } my($n, @payload) = @values; return (2 * $n, 2 * $n + 1, @payload); }, ResultTask => sub { my($key, @responses) = @_; my($new_2n, $new_2n_plus_1) = @responses; if ($results{$key}++) { print "DUPLICATE "; } else { for ($new_2n, $new_2n_plus_1) { generate_task_number_n($_) if $_ < $MAX; } } print "RESULT for $key => ", join(", ", map { (length > 32) ? substr($_, 0, 29)."... + ".(length() - 29) : $_ } @responses), "\n"; }, ); for (sort keys %results) { print "$_ => $results{$_}\n"; } ### forking task manager from here down use IO::Select; use IO::Pipe; use POSIX qw(WNOHANG); use Storable qw(freeze thaw); BEGIN { # task manager my %tasks; my @queue; sub add_task { ## external my $key = shift; $tasks{$key} = [@_]; } sub remove_task { delete $tasks{+shift}; } sub task_count { scalar keys %tasks; } sub next_task { return undef unless task_count() > 0; { @queue = sort keys %tasks unless @queue; my $key = shift @queue; redo unless exists $tasks{$key}; # might have disappeared freeze([$key, @{$tasks{$key}}]); } } } BEGIN { # kid manager my %kids; my $kid_max = 5; my $kid_task; my $result_task; my $trace = 0; sub run_queue { ## external { my %parms = @_; $kid_max = delete $parms{KidMax} if exists $parms{KidMax}; $kid_task = delete $parms{KidTask} if exists $parms{KidTask}; $result_task = delete $parms{ResultTask} if exists $parms{ResultTask}; $trace = delete $parms{Trace} if exists $parms{Trace}; die "unknown parameters for run_queue: ", join " ", keys %parms if keys %parms; } { warn "to go: ", task_count() if $trace; ## reap kids while ((my $kid = waitpid(-1, WNOHANG)) > 0) { warn "$kid reaped" if $trace; delete $kids{$kid}; } ## verify live kids for my $kid (keys %kids) { next if kill 0, $kid; warn "*** $kid found missing ***"; # shouldn't happen normally delete $kids{$kid}; } ## launch kids if (task_count() > keys %kids and keys %kids < $kid_max and my $kid = create_kid()) { send_to_kid($kid, next_task()); } ## see if any ready results READY: for my $ready (IO::Select->new(map $_->[1], values %kids)->can_read(1)) { ## gotta brute force this, grr, good thing data is small... my ($kid) = grep $kids{$_}[1] == $ready, keys %kids; { last unless read($ready, my $length, 4) == 4; $length = unpack "L", $length; last unless read($ready, my $message, $length) == $length; $message = thaw($message) or die "Cannot thaw"; remove_task($message->[0]); $result_task->(@$message); if (task_count() >= keys %kids) { send_to_kid($kid, next_task()); } else { # close it down $kids{$kid}[0]->close; } next READY; } ## something broken with this kid... kill 15, $kid; delete $kids{$kid}; # forget about it } redo if %kids or task_count(); } } sub create_kid { my $to_kid = IO::Pipe->new; my $from_kid = IO::Pipe->new; defined (my $kid = fork) or return; # if can't fork, try to make do unless ($kid) { # I'm the kid $to_kid->reader; $from_kid->writer; $from_kid->autoflush(1); do_kid($to_kid, $from_kid); exit 0; # should not be reached } $from_kid->reader; $to_kid->writer; $to_kid->autoflush(1); $kids{$kid} = [$to_kid, $from_kid]; $kid; } sub send_to_kid { my ($kid, $message) = @_; { ## if we get a SIGPIPE here, no biggy, we'll requeue request later local $SIG{PIPE} = 'IGNORE'; print { $kids{$kid}[0] } pack("L", length($message)), $message; } } sub do_kid { my($input, $output) = @_; warn "kid launched" if $trace; { last unless read($input, my $length, 4) == 4; $length = unpack "L", $length; last unless read($input, my $message, $length) == $length; $message = thaw($message) or die "Cannot thaw"; my ($key, @values) = @$message; my @results = $kid_task->($key, @values); $message = freeze([$key, @results]); print $output pack("L", length($message)), $message; redo; } warn "kid ending" if $trace; exit 0; } }