Extending ReactPHP's Child Processes Part Two
react/child-process
is very flexible and can work a lot of ways but sometimes you don't want to be bothered with the details of how it works and just want a simpler API to do that.
wyrihaximus/react-child-process-pool
The pool package has nearly exactly the same API as wyrihaximus/react-child-process-messenger
with the difference that you're dispatching calls to the first available child process in the pool.
This makes setting up resource pools or pools for CPU intensive fairly simple. On Linux it even supports
assigning a child process to each CPU core available. (OSX and Windows CPU core detection are on the roadmap.)
In fact wyrihaximus/react-child-process-pool
is using wyrihaximus/react-child-process-promise
from the previous article under the hood to detect the CPU core count.
Doctrine DBAL
Lets set up a flexible pool that executes Doctrine DBAL
queries for you and walk you through it in the code comments.
But first we need a class to run inside the child process:
use React\EventLoop\LoopInterface;
use WyriHaximus\React\ChildProcess\Messenger\ChildInterface;
use WyriHaximus\React\ChildProcess\Messenger\Messenger;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload;
use function React\Promise\resolve;
final class DoctrineDBAL implements ChildInterface
{
/**
* How to get the DBAL here is beyond the scope of this article,
* but lets assume it is an instance of `REPLACE_WITH_ACTUAL_DBAL_FQCN`.
*/
private $dbal;
/**
* Set up any available RPC's
*/
public static function create(Messenger $messenger, LoopInterface $loop)
{
/**
* Register the query RPC we'll be using in our next file.
*/
$messenger->registerRpc('fetchColumn', function (Payload $payload) {
return resolve([
'column' => $this->dbal->fetchColumn(
$payload['query'],
$payload['parameters'] ?? [],
0
),
]);
});
}
}
Now that we have a class handling the DBAL
interaction to run inside the child process:
use React\EventLoop\Factory as EventLoopFactory;
use WyriHaximus\React\ChildProcess\Pool\Factory\Flexible;
use WyriHaximus\React\ChildProcess\Pool\PoolInterface;
/**
* 1. First off we need the event loop.
*/
$loop = EventLoopFactory::create();
/**
* 2. We need to tell the pool how many processes it should spawn minimally and maximally.
* This allows us to quickly reuse when we need them or close down processes when we don't.
*/
$options = [
Options::MIN_SIZE => 0, // Keeping MIN_SIZE at 0 means no process will be running when
// there are no outstanding or queued calls.
Options::MAX_SIZE => 5, // Depending on what the pool does and how many resources your
// box has you set MAX_SIZE to something suitable.
Options::TTL => 3, // TTL at 3 means a process is kept a live for 3 seconds when it
// has nothing left to do until it is terminated.
];
/**
* 3. Create the pool with the child process class we've created in the previous codeblock and
* our process options.
*/
Flexible::createFromClass(DoctrineDBAL::class, $loop, $options)->done(function (PoolInterface $pool) {
// You now have a pool that spawns no child processes on start.
// But when you call rpc a new child process will be started for
// as long as the pool has work in the queue. With a maximum of five.
$pool->rpc(
MessageFactory::rpc(
'fetchColumn',
[
'query' => 'SELECT COUNT(id) AS user_count FROM users',
]
)
)->done(function (Payload $result) {
$c = $result['column'];
echo 'Found ', $c, ' ', ($c === 1 ? 'user' : 'users'), PHP_EOL;
});
// Note that once done with the pool we need to shut it down with $pool->terminate();
// this ensures we don't let any child processes run unnesecary.
});
/**
* 4. Run the loop and kick everything in motion
*/
$loop->run();
The only different with creating a flexible CPU core count pool is that we'd swap Flexible
with CpuCoreCountFlexible
.
In that case MAX_SIZE
is set to the number of CPU cores detected. I want to highlight the beauty of using minimum size zero
on a flexible pool again, especially with database connections this avoid errors like MySQL has gone away
when there is a long
time between queries.
wyrihaximus/react-child-process-closure
Now the above example is very powerful already, but what if we could bring running a random closure from
wyrihaximus/react-child-process-promise
into the pool? That can be done with wyrihaximus/react-child-process-closure
, and it
isn't a big change to your code. There are however a few caveats as described on the Super Closure readme
utilized by wyrihaximus/react-child-process-closure
, in short any referenced variable won't transfer over to the child.
use WyriHaximus\React\ChildProcess\Closure\ClosureChild;
use WyriHaximus\React\ChildProcess\Closure\MessageFactory;
Flexible::createFromClass(ClosureChild::class, $loop, $options)->then(function (PoolInterface $pool) {
$callback = function () {
usleep(random_int(500, 1000));
return ['microtime' => microtime(true)];
};
$promises = [];
for ($i = 0; $i < 166; $i++) {
$promises[$i] = $pool->rpc(MessageFactory::rpc($callback));
}
return all($prommises)->always(function () use ($pool) {
$pool->terminate;
});
})->done(function (array $payloads) {
foreach ($payloads as $i => $payload) {
echo $i, ': ', $payload['microtime'], PHP_EOL;
}
});
Use with bunny/bunny for queue processing
Combining a flexible pool with bunny/bunny
creates queue consumer that only requires resources when handling a message.
use Bunny\Async\Client;
use Bunny\Channel;
use Bunny\Message;
use WyriHaximus\React\ChildProcess\Closure\ClosureChild;
use WyriHaximus\React\ChildProcess\Closure\MessageFactory;
// Since both set up methods return promises we'll wrap them in an `all` so we get them together when they succeed.
all([
// Set up the flexible pool with the preferred $options.
'pool' => Flexible::createFromClass(QueueMessageHandlingChild::class, $loop, $options),
// Connect to RabbitMQ using Bunny and create a channel upon connecting.
'channel' => (new Client($loop, [/** Bunny config */]))->connect()->then(function (Client $client) {
return $client->channel();
}),
])->done(function ($tools) {
$pool = $tools['pool'];
$channel = $tools['channel'];
// Consume the messages from a queue named `queue`.
$channel->consume(
function (Message $message, Channel $channel) use ($pool) {
// Up on message arrival we call the RPC `handleMessage` on the pool to handle the message
// in a child process.
$pool->rpc(
MessageFactory::rpc(
'handleMessage',
[
'content' => $message->content,
]
)
)->done(function () use ($message, $channel) {
// Acknoledge the message on success
$channel->ack($message);
}, function () use ($message, $channel) {
// Mark the message failed on failure so it can be retried
$channel->nack($message);
});
},
'queue'
);
});
Conclusion
Child processes are very useful for a lot of different purposes, from image manipulation till gathering output from existing programs. These two packages are create to make such operations easier when dealing with a bulk of operations to be done. Which is why I've included the bunny example, a pattern I'm actively using that in production.
Bonus: rx/child-process
For those familiar with observables rx/child-process
lets you stream the STDOUT
and STDERR
output with reactivex/rxphp
observables. And you can use all the cool reactivex operators on it. For
those unfamiliar with observables I strongly recommend checking out RxMarbles
for visualisations on Rx operators.
P.S.
The Doctrine DBAL example featured in this post can be found fully working at wyrihaximus/react-doctrine-dbal
on Github ready to be experimented with.