Supervisor-Worker
The Supervisor-Worker pattern addresses the challenges of managing and maintaining long-running processes or tasks. A Supervisor component is responsible for monitoring and controlling one or more Worker components. The Workers perform the actual work, while the Supervisor ensures that Workers stay alive, restarts them if they fail, and handles failures gracefully. This separation of concerns enhances the reliability and resilience of the system.
Usage
This pattern is widely used in distributed systems, microservices architectures, and any scenario requiring asynchronous task processing with guaranteed execution. Specifically, it’s beneficial when: tasks are time-consuming, workers may encounter unpredictable failures, resilience is crucial for system stability, and monitoring/control of worker state is needed. Common applications include background job processing, data ingestion pipelines, and managing worker nodes in a cluster.
Examples
-
Kubernetes: Kubernetes utilizes the Supervisor-Worker pattern extensively. The Control Plane (Supervisor) manages Pods (Workers). If a Pod crashes, the Control Plane automatically restarts it, ensuring the desired number of replicas are always running. Health probes define the criteria for determining worker failure.
-
Celery (Python): Celery is a distributed task queue system. Celery’s worker processes execute tasks, and a Celery broker (often Redis or RabbitMQ) combined with the Celery client (Supervisor) manages the workers. If a worker becomes unresponsive, the Celery client detects this and restarts it, or spawns a new one. The Supervisor also handles task distribution and result retrieval.
-
Systemd (Linux): Systemd is a system and service manager for Linux. It functions as a Supervisor, managing services (Workers). Systemd defines configurations for each service, including restart policies (e.g., “on-failure”) that dictate how a service should be handled if it terminates unexpectedly, effectively embodying the Supervisor-Worker pattern at the OS level.
Specimens
10 implementationsThe Supervisor-Worker pattern manages a team of worker coroutines from a supervising coroutine. The supervisor ensures that if a worker fails, it’s restarted, and any child workers of the failed worker are cancelled. This prevents cascading failures and maintains application stability. The Kotlin implementation utilizes supervisorScope to create the supervising context. Each worker is launched as a launch coroutine within this scope. The supervising scope handles the cancellation and retry logic automatically when workers fail due to exceptions. This is idiomatic Kotlin because it leverages Kotlin’s coroutines for asynchronous task management and its built-in error handling capabilities while keeping the code concise and readable.
import kotlinx.coroutines.*
fun main() = runBlocking {
supervisorScope {
val worker1 = launch {
try {
repeat(5) {
println("Worker 1: Doing work ${it + 1}")
delay(500)
if (it == 2) throw Exception("Worker 1 failed!")
}
println("Worker 1: Finished")
} catch (e: Exception) {
println("Worker 1: Error - ${e.message}")
}
}
val worker2 = launch {
println("Worker 2: Starting")
delay(1000)
println("Worker 2: Finished")
}
val worker3 = launch {
try {
repeat(3) {
println("Worker 3: Doing work ${it + 1}")
delay(750)
}
println("Worker 3: Finished")
} catch (e: Exception) {
println("Worker 3: Error - ${e.message}")
}
}
worker1.join()
worker2.join()
worker3.join()
}
println("All workers completed (or supervisor finished handling failures).")
}
The Supervisor-Worker pattern decouples task management (the Supervisor) from task execution (the Workers). The Supervisor distributes work to a pool of Workers, typically via a channel. This allows for concurrency, resilience (workers can fail independently), and scalability.
This Rust implementation uses std::thread for worker threads and std::sync::mpsc for message passing. The Supervisor spawns a fixed number of Worker threads, each receiving a channel endpoint. Tasks (represented as strings here) are sent to the workers through a shared channel. The workers handle and print the tasks. Rust’s ownership and borrowing system manage data safety between threads, eliminating the need for explicit locks in this simple case. The use of channels provides a clear, idiomatic way to pass data between threads without shared mutable state.
use std::sync::mpsc;
use std::thread;
const NUM_WORKERS = 4;
struct Supervisor {
tx: mpsc::Sender<String>,
}
impl Supervisor {
fn new(rx_ends: Vec<mpsc::Receiver<String>>) -> Self {
let (tx, _) = mpsc::channel();
for rx in rx_ends {
thread::spawn(move || {
Worker::new(rx);
});
}
Supervisor { tx }
}
fn dispatch(&self, task: String) {
self.tx.send(task).unwrap();
}
}
struct Worker {
rx: mpsc::Receiver<String>,
}
impl Worker {
fn new(rx: mpsc::Receiver<String>) {
loop {
match rx.recv() {
Ok(task) => {
println!("Worker processing task: {}", task);
// Simulate work
std::thread::sleep(std::time::Duration::from_millis(500));
}
Err(_) => {
println!("Worker shutting down");
break;
}
}
}
}
}
fn main() {
let mut rx_ends = Vec::new();
for _ in 0..NUM_WORKERS {
let (tx, rx) = mpsc::channel();
rx_ends.push(rx);
}
let supervisor = Supervisor::new(rx_ends);
supervisor.dispatch("Task 1".to_string());
supervisor.dispatch("Task 2".to_string());
supervisor.dispatch("Task 3".to_string());
supervisor.dispatch("Task 4".to_string());
supervisor.dispatch("Task 5".to_string());
// Allow workers to finish
std::thread::sleep(std::time::Duration::from_secs(2));
}
The Supervisor-Worker pattern distributes tasks to multiple worker goroutines, managed by a supervisor. The supervisor receives tasks from a channel, dispatches them to available workers, and collects results. This pattern enhances concurrency and responsiveness by preventing the main goroutine from blocking on long-running operations.
This Go implementation uses channels for communication between the supervisor and workers. The supervisor function creates a pool of workers and a channel for tasks. It receives tasks from a tasks channel, sends them to worker channels, and aggregates the results. Workers continuously listen on their assigned channels, process tasks, and send results back to the supervisor. This approach is idiomatic Go due to its reliance on goroutines and channels for concurrent communication, avoiding explicit locking and promoting a “share memory by communicating” philosophy.
package main
import (
"fmt"
"sync"
)
// Task represents a unit of work.
type Task struct {
ID int
Payload int
}
// Result represents the outcome of a task.
type Result struct {
TaskID int
Value int
Error error
}
// worker function processes tasks from its assigned channel.
func worker(id int, tasks <-chan Task, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
// Simulate work
value := task.Payload * 2
results <- Result{TaskID: task.ID, Value: value}
fmt.Printf("Worker %d processed task %d: %d -> %d\n", id, task.ID, task.Payload, value)
}
}
// supervisor function dispatches tasks to workers and collects results.
func supervisor(numWorkers int, tasks chan<- Task, results <-chan Result, wg *sync.WaitGroup) {
// Create worker pool
for i := 0; i < numWorkers; i++ {
go worker(i+1, tasks, results, wg)
}
// Dispatch tasks
for i := 1; i <= 10; i++ {
tasks <- Task{ID: i, Payload: i}
}
close(tasks) // Signal workers that no more tasks are coming
// Collect results
for i := 0; i < 10; i++ {
result := <-results
fmt.Printf("Received result for task %d: %d\n", result.TaskID, result.Value)
}
}
func main() {
numWorkers := 3
tasks := make(chan Task)
results := make(chan Result)
var wg sync.WaitGroup
wg.Add(numWorkers)
go supervisor(numWorkers, tasks, results, &wg)
wg.Wait()
close(results)
}
The Supervisor-Worker pattern distributes tasks to multiple worker threads managed by a supervisor thread. The supervisor creates and manages a pool of worker threads, assigns them tasks via a queue, and handles results or errors. This improves responsiveness and utilizes multi-core processors.
The C implementation uses POSIX threads (pthreads) for concurrency. A shared queue (implemented using a simple linked list and mutex/condition variable for synchronization) holds tasks. The supervisor thread adds tasks to the queue, and worker threads continuously attempt to dequeue and execute them. The task structure contains a function pointer and its arguments, allowing for flexible task execution. This approach is common in C for managing threads and shared resources, prioritizing explicit synchronization for safety and performance.
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
// Task structure
typedef struct {
void (*function)(void*);
void* argument;
} Task;
// Queue node
typedef struct QueueNode {
Task task;
struct QueueNode* next;
} QueueNode;
// Queue structure
typedef struct {
QueueNode* head;
QueueNode* tail;
pthread_mutex_t mutex;
pthread_cond_t cond;
} Queue;
// Initialize the queue
void queueInit(Queue* queue) {
queue->head = NULL;
queue->tail = NULL;
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->cond, NULL);
}
// Add a task to the queue
void queueAdd(Queue* queue, Task task) {
pthread_mutex_lock(&queue->mutex);
QueueNode* newNode = (QueueNode*)malloc(sizeof(QueueNode));
if (newNode) {
newNode->task = task;
newNode->next = NULL;
if (queue->tail) {
queue->tail->next = newNode;
} else {
queue->head = newNode;
}
queue->tail = newNode;
pthread_cond_signal(&queue->cond); // Signal a waiting worker
}
pthread_mutex_unlock(&queue->mutex);
}
// Remove a task from the queue
Task queueRemove(Queue* queue) {
pthread_mutex_lock(&queue->mutex);
while (!queue->head) {
pthread_cond_wait(&queue->cond, &queue->mutex); // Wait for a task
}
QueueNode* temp = queue->head;
Task task = temp->task;
queue->head = queue->head->next;
if (!queue->head) {
queue->tail = NULL;
}
free(temp);
pthread_mutex_unlock(&queue->mutex);
return task;
}
// Worker thread function
void* workerThread(void* arg) {
Queue* queue = (Queue*)arg;
while (1) {
Task task = queueRemove(queue);
task.function(task.argument);
}
return NULL;
}
// Example task function
void printMessage(void* message) {
printf("Worker: %s\n", (char*)message);
sleep(1); // Simulate work
}
int main() {
Queue queue;
queueInit(&queue);
const int numWorkers = 3;
pthread_t workers[numWorkers];
// Create worker threads
for (int i = 0; i < numWorkers; i++) {
pthread_create(&workers[i], NULL, workerThread, &queue);
}
// Add tasks from the supervisor
for (int i = 0; i < 5; i++) {
char message[20];
snprintf(message, sizeof(message), "Message %d", i);
Task task = {printMessage, (void*)message};
queueAdd(&queue, task);
}
// Wait for worker threads to finish (in this example, they run indefinitely)
for (int i = 0; i < numWorkers; i++) {
pthread_join(workers[i], NULL);
}
return 0;
}
The Supervisor-Worker pattern distributes tasks among a pool of worker threads managed by a supervisor thread. The supervisor assigns tasks from a queue to available workers, ensuring work is processed concurrently without overwhelming the system. This implementation uses a std::queue for task management and std::thread for worker threads. C++’s standard library provides robust threading primitives, making this a natural fit. The use of a shared queue and condition variables allows for efficient communication and synchronization between the supervisor and workers, avoiding busy-waiting. The worker function is a simple loop that processes tasks until signaled to stop.
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
class Supervisor {
public:
Supervisor(int num_workers) : num_workers_(num_workers), shutdown_(false) {}
void start() {
workers_.resize(num_workers_);
for (int i = 0; i < num_workers_; ++i) {
workers_[i] = std::thread([this, i]() { worker_function(i); });
}
}
void stop() {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
shutdown_ = true;
}
condition_.notify_all(); // Wake up all workers to exit
for (auto& worker : workers_) {
worker.join();
}
}
void submit_task(const std::function<void()>& task) {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
task_queue_.push(task);
}
condition_.notify_one(); // Wake up a worker
}
private:
void worker_function(int worker_id) {
while (!shutdown_) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock, [this] { return shutdown_ || !task_queue_.empty(); });
if (shutdown_) break;
task = std::move(task_queue_.front());
task_queue_.pop();
}
task();
}
std::cout << "Worker " << worker_id << " shutting down." << std::endl;
}
std::queue<std::function<void()>> task_queue_;
std::mutex queue_mutex_;
std::condition_variable condition_;
std::vector<std::thread> workers_;
int num_workers_;
bool shutdown_;
};
int main() {
Supervisor supervisor(4);
supervisor.start();
for (int i = 0; i < 10; ++i) {
supervisor.submit_task([i]() {
std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
}
supervisor.stop();
return 0;
}
The Supervisor-Worker pattern delegates complex tasks to multiple worker threads managed by a supervisor thread. The supervisor distributes work, monitors worker status, and handles results or failures. This improves responsiveness and utilizes multi-core processors.
This C# implementation uses a Supervisor class to manage a pool of Worker threads. The Supervisor accepts tasks via a queue and assigns them to available workers. Workers process tasks and return results to the supervisor. The use of Task and async/await is idiomatic C# for asynchronous operations and thread management, avoiding explicit thread handling where possible. A BlockingCollection provides a thread-safe queue for task distribution.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Supervisor
{
private readonly int _workerCount;
private readonly BlockingCollection<Func<Task<string>>> _taskQueue;
private readonly Task[] _workers;
public Supervisor(int workerCount)
{
_workerCount = workerCount;
_taskQueue = new BlockingCollection<Func<Task<string>>>();
_workers = new Task[_workerCount];
for (int i = 0; i < _workerCount; i++)
{
_workers[i] = Task.Run(async () => await WorkerTask());
}
}
public void SubmitTask(Func<Task<string>> task)
{
_taskQueue.Add(task);
}
private async Task WorkerTask()
{
while (true)
{
Func<Task<string>> task = _taskQueue.Take();
try
{
string result = await task();
Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} completed task with result: {result}");
}
catch (Exception ex)
{
Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} failed with: {ex.Message}");
}
}
}
public void Stop()
{
_taskQueue.CompleteAdding();
Task.WaitAll(_workers);
}
}
public class Example
{
public static async Task Main(string[] args)
{
var supervisor = new Supervisor(3);
for (int i = 0; i < 10; i++)
{
int taskNumber = i;
supervisor.SubmitTask(async () =>
{
await Task.Delay(100); // Simulate some work
return $"Task {taskNumber} completed";
});
}
await Task.Delay(500); // Allow tasks to complete
supervisor.Stop();
}
}
The Supervisor-Worker pattern delegates complex or potentially blocking tasks to worker threads, preventing the main thread from freezing and maintaining responsiveness. A supervisor thread manages the workers, distributing work and collecting results. This is particularly useful in TypeScript/JavaScript environments where the single-threaded nature of the main event loop can cause UI unresponsiveness.
This implementation uses Node.js’s worker_threads module. The Supervisor class creates and manages Worker instances, sending them messages with tasks. Workers perform the task and return the result. The Supervisor then handles the results. TypeScript’s type safety is leveraged to define the message types exchanged between supervisor and workers, improving code clarity and preventing runtime errors. Using classes and async/await aligns with modern TypeScript best practices for managing asynchronous operations and thread communication.
// supervisor.ts
import { Worker } from 'worker_threads';
import * as path from 'path';
type Task = {
type: 'calculatePi';
iterations: number;
};
type Result = {
type: 'piResult';
value: number;
};
class Supervisor {
private workers: Worker[] = [];
private numWorkers: number;
constructor(numWorkers: number) {
this.numWorkers = numWorkers;
this.initializeWorkers();
}
private initializeWorkers(): void {
for (let i = 0; i < this.numWorkers; i++) {
const worker = new Worker(path.resolve(__dirname, 'worker.ts'));
this.workers.push(worker);
worker.on('message', (result: Result) => {
console.log(`Received result: ${result.value}`);
});
worker.on('error', (err) => {
console.error(`Worker error: ${err}`);
});
worker.on('exit', (code) => {
console.log(`Worker exited with code ${code}`);
});
}
}
public sendTask(task: Task): void {
// Round-robin task distribution
const worker = this.workers[this.getWorkerIndex()];
worker.postMessage(task);
}
private getWorkerIndex(): number {
let index = 0;
for (let i = 0; i < this.workers.length; i++) {
if (!this.workers[i].isBusy) {
index = i;
break;
}
}
return index;
}
}
// worker.ts
import { parentPort } from 'worker_threads';
import { Task, Result } from './supervisor';
parentPort?.on('message', (task: Task) => {
if (task.type === 'calculatePi') {
const pi = calculatePi(task.iterations);
(parentPort as any).postMessage({ type: 'piResult', value: pi } as Result);
}
});
function calculatePi(iterations: number): number {
let pi = 0;
let sign = 1;
for (let i = 0; i < iterations; i++) {
pi += sign / (2 * i + 1);
sign *= -1;
}
return pi * 4;
}
// main.ts
const supervisor = new Supervisor(2);
for (let i = 0; i < 5; i++) {
const iterations = 1000000 + i * 200000;
setTimeout(() => {
supervisor.sendTask({ type: 'calculatePi', iterations });
}, i * 500);
}
The Supervisor-Worker pattern delegates computationally intensive or blocking tasks to worker threads, preventing the main thread from freezing and maintaining a responsive user interface. A supervisor manages the workers, distributing tasks and collecting results. This example uses web workers in JavaScript. The supervisor creates a worker, sends it a message with the task (calculating a factorial), and receives the result via a message event. This approach is idiomatic JavaScript because it leverages the event-driven, non-blocking nature of the language and the browser’s web worker API to achieve concurrency without true multithreading.
// supervisor.js
const worker = new Worker('worker.js');
function calculateFactorial(n) {
return new Promise((resolve, reject) => {
worker.postMessage({ task: 'factorial', data: n });
worker.onmessage = (event) => {
if (event.data.error) {
reject(event.data.error);
} else {
resolve(event.data.result);
}
};
worker.onerror = (error) => {
reject(error);
};
});
}
// Example usage:
calculateFactorial(5)
.then(result => console.log(`Factorial of 5 is: ${result}`))
.catch(error => console.error(`Error calculating factorial: ${error}`));
// worker.js
self.onmessage = (event) => {
const { task, data } = event.data;
if (task === 'factorial') {
let result = 1;
for (let i = 2; i <= data; i++) {
result *= i;
}
self.postMessage({ result: result });
}
};
The Supervisor-Worker pattern distributes tasks to multiple worker processes, managed by a supervisor. This improves performance and responsiveness by leveraging parallelism. The supervisor handles task queuing and distribution, while workers independently process assigned tasks.
This Python implementation uses multiprocessing.Queue for task passing and multiprocessing.Process to create worker processes. The Supervisor class manages the queue and starts workers. Workers continuously retrieve tasks from the queue and execute them. The use of queues is a natural fit for Python’s multiprocessing, providing thread-safe communication. The class-based structure promotes organization and reusability, aligning with Python’s OOP capabilities. The if __name__ == '__main__': guard is crucial for multiprocessing to function correctly on all platforms.
import multiprocessing
import time
def worker_function(task_queue, worker_id):
"""Worker function to process tasks from the queue."""
while True:
task = task_queue.get()
if task is None:
break # Signal to terminate
try:
result = task(worker_id)
print(f"Worker {worker_id}: Processed task, result: {result}")
except Exception as e:
print(f"Worker {worker_id}: Error processing task: {e}")
task_queue.task_done() # Indicate task completion
class Supervisor:
"""Manages a pool of worker processes."""
def __init__(self, num_workers):
self.task_queue = multiprocessing.JoinableQueue()
self.workers = []
self.num_workers = num_workers
def start(self):
"""Starts the worker processes."""
for i in range(self.num_workers):
worker = multiprocessing.Process(target=worker_function, args=(self.task_queue, i))
self.workers.append(worker)
worker.start()
def add_task(self, task):
"""Adds a task to the queue."""
self.task_queue.put(task)
def shutdown(self):
"""Signals workers to terminate and waits for them to finish."""
for _ in range(self.num_workers):
self.task_queue.put(None) # Sentinel value to signal termination
self.task_queue.join() # Wait for all tasks to be processed
for worker in self.workers:
worker.join()
if __name__ == '__main__':
def my_task(worker_id):
"""A sample task to be processed."""
return worker_id * 2
supervisor = Supervisor(num_workers=3)
supervisor.start()
for i in range(10):
supervisor.add_task(my_task)
supervisor.shutdown()
print("All tasks completed.")
The Supervisor-Worker pattern distributes tasks to multiple worker threads managed by a supervisor thread. The supervisor maintains a work queue and assigns tasks to idle workers. This promotes concurrency and responsiveness by preventing the main thread from blocking on long-running operations.
The Java implementation uses a Supervisor class to manage a queue of WorkItem tasks and a pool of Worker threads. Workers continuously check the queue for tasks and execute them. The WorkItem is a functional interface representing a unit of work. This approach leverages Java’s threading capabilities and functional interfaces for a clean and efficient solution. Using a blocking queue ensures thread safety and efficient task distribution.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.List;
import java.util.ArrayList;
interface WorkItem {
void execute();
}
class Supervisor {
private final BlockingQueue<WorkItem> workQueue;
private final List<Worker> workers;
public Supervisor(int numWorkers) {
this.workQueue = new LinkedBlockingQueue<>();
this.workers = new ArrayList<>();
for (int i = 0; i < numWorkers; i++) {
Worker worker = new Worker(workQueue);
workers.add(worker);
worker.start();
}
}
public void submit(WorkItem workItem) {
workQueue.add(workItem);
}
public void shutdown() {
workers.forEach(Worker::interrupt);
}
}
class Worker extends Thread {
private final BlockingQueue<WorkItem> workQueue;
public Worker(BlockingQueue<WorkItem> workQueue) {
this.workQueue = workQueue;
}
@Override
public void run() {
while (true) {
try {
WorkItem workItem = workQueue.take();
workItem.execute();
} catch (InterruptedException e) {
break; // Allow thread to exit
}
}
}
}
// Example Usage
public class Main {
public static void main(String[] args) throws InterruptedException {
Supervisor supervisor = new Supervisor(3);
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
supervisor.submit(() -> System.out.println("Executing task: " + taskNumber + " by " + Thread.currentThread().getName()));
Thread.sleep(100); // Simulate task submission rate
}
Thread.sleep(1000); // Allow tasks to complete
supervisor.shutdown();
}
}