Mike Rockétt

Pipelines for PHP

Some months back, I forked the League’s Pipeline package to give it a little more oomph.

#PHP — 31 July 2021

Design patterns are becoming ever more important, as developers continue to build more complex applications that require a fluid-yet-robust backbone.

One such design pattern is the pipeline pattern, which has been implemented in a variety of different ways across languages, frameworks and tools. At language level, pipelines are functional tools that allow you to easily chain a variable of some kind, passing it through a set of functions, to return a result. At framework level, where the language does not support pipelines, they’re implemented as tools that do something similar – that is, they have the same outcome.

A major benefit of using this pattern is that code is just easier to write, and to read – depending on the implementation, of course.

In PHP-land, we don’t have the luxury of the pipeline pattern - RFCs have come and gone, and it doesn’t look like we’re going to get them any time soon. That, and generics, which seem to be impossible to implement in the language.

In order to use this pattern in PHP, we have to resort to tools, and reach for packages. Laravel has a built-in pipeline implementation, and the League has quite a popular one, too. It is the latter that seems to be the cleanest approach to me, which is why I decided to fork it (later detached) and add some oomph to it.

The original package includes the concept of a pipeline processor, which is simply a class that gets used when processing the pipeline – it determined how the pipeline is run. At present, the support a FingersCrossedProcessor, which runs all pipes until it’s done or an exception is thrown, as well as an InterruptibleProcessor, which allows you to halt the pipeline if a certain condition is resolved to be true. Unfortunately, the interruptible processor is not documented, and only gives consideration to one-way condition-matching.

My fork corrects these issues, and adds another processor, called the TapProcessor which allows you to tap into each stage and do something, like write events to a log.

Changes to InterruptibleProcessor

This processor can now be built in two ways:

  • Using new InterruptibleProcessor($callback), which allows interruption when the result of the callback is truthy.
  • Using InterruptibleProcessor::continueWhen($callback), which does the same thing, only using a “static constructor” in order to be a little more expressive/readable.
  • Using InterruptibleProcessor::continueUnless($callback), which does the same thing as continueWhen, but tells the processor to invert the condition.

This allows you to write more expressive code:

$processor = InterruptibleProcessor::continueWhen(
fn ($traveler) => $traveler->shouldProceed()
// or
$processor = InterruptibleProcessor::continueUnless(
fn ($traveler) => $traveler->responseAlreadyPrepared()

Here, we assume the $traveler is a class-instance that contains those methods.

Tip: It’s recommended to create your own stage-contracts so that you can type-hint the traveler safely.

Addition of TapProcessor

The below is taken directly from the readme:

Using this processor, you can invoke an action before and/or after a stage is piped through a pipeline. This can be useful if you would like to handle common side-effects outside of each stage, such as logging or broadcasting.

The processor takes two callables:

use Rockett\Pipeline\Processors\TapProcessor;
// Define and instantiate a $logger and a $broadcaster …
$processor = new TapProcessor(
// $beforeEach, called before a stage is piped
static fn ($traveler) => $logger->info(
'Traveller passing through pipeline:',
// $afterEach, called after a stage is piped and the output captured
static fn ($traveler) => $broadcaster->broadcast(
'Something happened',
$pipeline = (new Pipeline($processor))
->pipe(new StageOne)
->pipe(new StageTwo)
->pipe(new StageThree);
$output = $pipeline->process($traveler);

Both of these callables are optional. By excluding both, the processor will act in the exact same way as the default FingersCrossedProcessor.

If you would like to pass only one callback, then you can use the helper methods:

$processor = (new TapProcessor)->beforeEach(/** callable **/); // or …
$processor = (new TapProcessor)->afterEach(/** callable **/);

You can also chain them as an alternative to using the constructor:

$processor = (new TapProcessor)
->beforeEach(/** callable **/)
->afterEach(/** callable **/);

If you are using PHP 8 or higher, it is encouraged that you use named arguments instead:

$processor = new TapProcessor(
beforeEach: /** optional callable **/,
afterEach: /** optional callable **/,

One problem remains…

The current architecture of the package doesn’t accomodate the mixing of processors. That is, you cannot use an InterruptibleProcessor with a TapProcessor. I have a fw ideas in mind for this (think, “piping pipeline processors”), but will give it some more thought before working on it. 🤪


If you’re using my pipeline package and have some feedback to share, please feel free to open up a new discussion on GitHub.