nowucca.com - personal software technology blog

Pipeline (Processing, Task Completion)

Intent

A Pipeline organizes a task having sequential, data-dependent steps into a coherent activity.  The intent is to allow steps in the task to be developed and tested independently, and over time to allow tasks to be included and excluded when required without affecting the overall activity.  Consider use of a Pipeline when a task can be organized into sequential steps that deal with separate aspects of the task, where most steps are separate concerns, and each step operates on a common data object, usually within a single transaction.

Also Known As

Plugin Processing, Managed Chain of Responsibility

Motivation

Consider the placement of an Order with a large online retailer.  The processing of the order has many different steps, from validation, journalling, fulfillment and shipping tasks.

Applicability

Use  a pipeline when you have a sequential, data-dependent  set of steps that co-operate to perform an overall coherent activity.  The steps can depend on each other using data, but are otherwise independent.  Often, large methods that perform a complex task can be refactored into a series of smaller, private method calls with data-dependence.  Consider application of this pattern when there is a single underlying object that can be used to model all the data from the tasks, or to replace a co-operative set of private methods within a class to objectify the implementation strategy.

A common situation is order processing code, where one can identify a large Order object upon which many different steps are applied.  Another common use of this pattern is the forwarding of a web server Request through filters and servlets, where each filter and servlet has a separate concern, and each may place data into the  Request model along the way.

Structure

There are three objects.  A Pipeline can have one or more Tasks, and share a Data object amongst those tasks.

Pipeline
  Task +
Data

Participants

Pipeline

Task

Data

Collaborations

The Pipeline class manages the tasks, and each task operates on the Data object to complete the activity required.

Consequences

The Pipeline undertakes the execution of a coherent activity to completion or failure.  It achieves this by executing a series of tasks in sequence, and updatinga shared Data object with results of the activity.  Tasks can be added and removed statically or dynamically at runtime, and the intent is for the tasks to be independent and testable.

  1. Task Complexity. One must decide when designing a Pipeline the complexity of code in the tasks. Typically, smaller coherent tasks are preferred where possible, although some designs call for more complex processing to avoid an unnecessary number of task objects.
  2. Task Coherence. The work of the task should as far as possible be dependent upon the Data object being processed and be a coherent unit of work, independent of other tasks.  For non-critical task objects, one way to test coherence is to as whether it is possible to remove the task and still perform the overall activity?  For critical tasks, the nature of the problem being solved should be used as a guide to the coherence.
  3. Task Independence. Each task object should as far as possible be independently testable.  That is, a unit test case should be able to be written whereby only the Task objects and its dependents are required.
  4. Execution semantics. An execution semantics for the Pipeline is whether the tasks care executed sequentially, or in parallel, and also how failures and retries for tasks are handled.  The Pipeline object is responsible for defining these semantics, in conjunction with the Task interface.  

Implementation

What pitfalls, hints, or techniques should you be aware of when implementing the
pattern? Are there language-specific issues?

  1. Transactional concerns. Task objects are not all created equal - some are 'critical' to the activity, and others are ancillary.  It is up to the Pipeline designer to handle failures of critical and ancillary tasks appropriately for their domain.  Often separate pipelines are used to demarcate transactional boundaries.  Additionally, if a transaction can span multiple tasks, care must be made to avoid accidentally live-locking on table or row locks with separate connections.
  2. Atomic Data Updates. Some Task computations are complex, and may fail after a large amount of processing has occurred.  To avoid partial updates tot he shared Data objects, a common strategy is to build up Data changes in a separate temporary Data object, and upon success, apply or 'commit' the temporary changes to the shared Data object.  Upon failure, the  partial results in the temporary Data object can be logged and/or discarded.
  3. Pipeline Configuration. Each Pipeline can have a set of configuration parameters, potentially used to change execution semantics or to include or exclude Tasks from the Pipeline at runtime, or to change the transactional nature of the Pipeline if required.
  4. Shared System State. One must take care to arrange for a degree of task independence - that is, if the tasks share system resources beyond the Data object, such as transactional resources, it should be provable that no interference can occur between shared resources.
  5. Composite Pipelines. It is possible to nest the execution of one Pipeline (the child) into a Task in another Pipeline (the parent).  This can be used as a grouping mechanism for related tasks, and also as a way to vary Pipeline Configuration parameters.  A common pattern is to use up to three child Pipelines in a 'Pre-', 'Core-', 'Post-' parent pipeline, where the Post- pipeline tasks are in a different transaction for more complex activities.

Sample Code and Usage

public class PlaceOrderPipeline {
     @Autowired List<OrderPlacementTask> tasks;
     @Transactional(rollbackFor={PipelineException.class}
      public void placeOrder(OrderData data) throws PipelineException {
         for (OrderPlacementTask task: tasks) {
             task.execute(data);
         }
    }

    OrderPlacementPipeline pipeline = Container.get("OrderPlacementPipelineBean");
    OrderData order = // build orderData object from user inputs
    try {
        pipeline.execute(order);
    } catch (PipelineException p) {
        // recover and/or pass this up
    }

Known Uses

Http Servlet Request: In this pipeline the tasks are servlets, and the Data obejct is the Http Request, and the Pipeline's activity is the handling of the request.

Order Placement: When a customer signs up for Netflix, the processing is performed using a Pipeline pattern.

Related Patterns

A chain of responsibility is similar to a pipeline.  The main differenceis that with a Pipeline there is a notion of a controller managing the process that has configuration parameters, and that can include or exclude tasks at runtime, whereas a chain of responsibility localizes the decision to the Tasks as to which Task executes next.  Use a Pipeline when you desired confirable parameters for the activity, when there is a clear sharable Data object and the tasks need to be dynamically controllable.

A Pipeline in that sense is like a mediated chain of responsibility.