Cees-Jan

- Post source at 🐙

ReactPHP: Streams

Streams are the blood-vessels that transport the blood (data) pumped through your program by the heart of ReactPHP: the event-loop.

Stream

The focus of this article will be on streams. How to use them in combination with sockets and the filesystem is beyond the scope of this article and will be covered by a future articles.

Installation

The event-loop only deals with low level PHP streams, which react/stream wraps, so we have to install react/stream separately:

composer require react/stream
A Simple stream

Lets, once again, go back to the timers example. Instead of just echoing it out we'll be writing it to STDOUT instead, which is supported in PHP by opening php://stdout with fopen. (Keeping the example simple, clue/stdio-react does this better and cleaner.)

By passing the resource from fopen into a Stream object we can read from it and write to it in an asynchronous nature. Let's start with just writing. By calling $stream->write(++$i . PHP_EOL); instead of echo ++$i, PHP_EOL; we push the data into the stream's buffer. Once the resource from fopen is reading for writing, the buffer will write its buffer into the resource. You on the other hand can keep adding data into it without having the worry about when the resource is ready for writing. (The Buffer maintains a softlimit and keeps you informed when you reach that and go under it again. This is specifically useful to prevent memory from overflowing.)

<?php

require 'vendor/autoload.php';

$loop = React\EventLoop\Factory::create();

$stream = new \React\Stream\Stream(fopen('php://stdout', 'w'), $loop);

$i = 0;
$loop->addPeriodicTimer(1, function(React\EventLoop\Timer\Timer $timer) use (&$i, $loop, $stream) {
    $stream->write(++$i . PHP_EOL);

    if ($i >= 15) {
        $loop->cancelTimer($timer);
        $stream->end();
    }
});

$loop->run();
Piping

Data usually comes in from somewhere else then a timer. Consider the following example where we pipe data from STDIN directly into STDOUT. Effectively making an echo program that returns everything we put into it.

<?php

require 'vendor/autoload.php';

$loop = React\EventLoop\Factory::create();

$read = new \React\Stream\Stream(fopen('php://stdin', 'r+'), $loop);
$write = new \React\Stream\Stream(fopen('php://stdout', 'w+'), $loop);
$read->pipe($write);

$loop->run();

The beauty about piping one stream into the other is that it takes care of everything. (From the data passing from one into the other, ending the write stream when the read stream is done, and pausing when necessary.)

Rainbows and kittens

More often we want to do something with the data we get before sending it somewhere else. The example below is a very simple version of cowsay. Just like the previous example it will echo whatever we put into it back to us bit a little more fabulous and told to us by a kitten. (Even added a check for the word quit to close the streams and effectively ending our loop.)

<?php

require 'vendor/autoload.php';

$loop = React\EventLoop\Factory::create();

$superFab = new \Fab\SuperFab();

$read = new \React\Stream\Stream(fopen('php://stdin', 'r+'), $loop);
$write = new \React\Stream\Stream(fopen('php://stdout', 'w+'), $loop);

$read->on('data', function ($data, $read) use ($write, $superFab) {
    if (trim($data) == 'quit') {
        $write->close();
        $read->close();
    }

    $input = trim($data);
    $line = Kitten::get() . ' says "' . $input . '"';
    $line = $superFab->paint($line);
    $line .= PHP_EOL;
    $write->write($line);
});

$loop->run();
Community example: Phergie

As said before streams are the blood vessels of ReactPHP so this week finding community example was a lot easier then last week. But instead of two this week a big shout out to Matthew Turland creator and maintainer Phergie. Recently Phergie v3 has been completely rewritten to use ReactPHP, as an IRC bot it communicates with streams over sockets with remote IRC servers. Getting started with Phergie is a simple as creating a config file and running ./vendor/bin/phergie:

<?php
use Phergie\Irc\Connection;
return [
    'plugins' => [],
    'connections' => [
        new Connection([
            'serverHostname' => 'irc.freenode.net',
            'username' => 'Elazar',
            'realname' => 'Matthew Turland',
            'nickname' => 'Phergie3',
        ]),
    ]
];

Want Youtube support? There is a plugin for that. Twitter? There is a plugin for that. The list with available plugins is still expanding.

Examples

All the examples from this post can be found on Github.

Conclusion

Streams are one of the most important components of ReactPHP. Without them exchanging data with the filesystem, sockets, or anything else transporting data would be a lot harder and clunkier. Streams take a lot of work out of hands and they pave the way for the react/socket, react/socket-client, and everything building upon those like the react/http-client component. We'll be seeing streams a lot in the upcoming articles.


Categories: PHP - ReactPHP - ReactPHP Series Tags: ReactPHP - Streams - PHP