PATTERN
Pipeline
The Pipeline pattern is a processing pattern that breaks down a complex task into a series of independent processing stages connected in a sequence. Each stage takes the output of the previous stage as its input, performing a specific transformation or action. This promotes separation of concerns, making the system easier to understand, maintain, and extend.
The core idea is to avoid monolithic code that handles all aspects of a process. Instead, data “flows through the pipeline” enabling parallel processing (where stages aren’t dependent on each other) and easier error handling as issues can be isolated to specific stages.
Usage
The Pipeline pattern is frequently used in several scenarios:
- Data Processing: Extract, Transform, Load (ETL) processes in data warehousing heavily rely on pipelines to clean, validate, and reformat data before storing it.
- Image/Video Processing: Applying a series of filters, adjustments, and encoding steps to multimedia assets.
- Compiler Design: Representing the phases of compilation (lexical analysis, parsing, semantic analysis, code generation).
- Workflow Automation: Orchestrating a series of actions, such as sending notifications, updating databases, and triggering other services.
- Machine Learning: Building a sequence of feature extraction, model training, and prediction steps.
Examples
-
Unix Pipelines: The command-line interface in Unix-like operating systems is a classic example. Commands can be chained together using the pipe symbol (
|), where the standard output of one command becomes the standard input of the next. For example,cat myfile.txt | grep "error" | wc -lpipes the content ofmyfile.txttogrepto filter lines containing “error”, and then towcto count the number of those lines. -
Promise Chaining (JavaScript): JavaScript Promises let you chain asynchronous operations using
.then(). Each.then()represents a stage in a pipeline, receiving the result of the previous Promise and returning a new Promise. This allows code to be written in a more synchronous style while still handling asynchronous operations. javascript fetch(‘https://api.example.com/data') .then(response => response.json()) // Stage 1: Parse JSON .then(data => data.map(item => item.name)) // Stage 2: Extract names .then(names => console.log(names)); // Stage 3: Log names -
Airflow (Python): Apache Airflow is a platform to programmatically author, schedule, and monitor workflows. These workflows are defined as Directed Acyclic Graphs (DAGs) of tasks, effectively implementing a pipeline. Each task represents a stage that processes data generated by preceding tasks. python from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime
def task_1(): return “Hello”
def task_2(input_string): return input_string + " World"
def task_3(input_string): return input_string + “!”
with DAG( dag_id=‘pipeline_example’, start_date=datetime(2023, 1, 1), schedule_interval=None, catchup=False ) as dag: task1 = PythonOperator(task_id=‘task_1’, python_callable=task_1) task2 = PythonOperator(task_id=‘task_2’, python_callable=task_2, op_kwargs={‘input_string’: task1.output}) task3 = PythonOperator(task_id=‘task_3’, python_callable=task_3, op_kwargs={‘input_string’: task2.output})