111
behavioral integration distributed systems

Message Broker

Reference Wikipedia ↗
Message Broker — sequence diagram
Plate 111 sequence diagram

The Message Broker pattern facilitates communication and data exchange between different applications, systems, and services. It acts as an intermediary, receiving messages from producers and routing them to interested consumers. This decoupling allows components to operate independently, improving scalability, resilience, and maintainability. Instead of direct point-to-point connections, components interact through the broker, enabling asynchronous communication and flexible integration.

Usage

The Message Broker pattern is widely used in scenarios requiring loose coupling and asynchronous communication. Common use cases include:

  • Event-Driven Architectures: Systems react to events published by other components. For example, a user registration event might trigger welcome email sending and profile creation in separate services.
  • Microservices Communication: Enabling independent microservices to exchange data without direct dependencies.
  • Background Task Processing: Offloading time-consuming tasks from the main application thread to be processed asynchronously by worker services.
  • Data Streaming: Handling high-volume, real-time data streams from various sources.
  • Integration with Legacy Systems: Providing a standardized interface for integrating newer applications with older, less flexible systems.

Examples

  • RabbitMQ: A popular open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). It’s used extensively in enterprise applications for reliable message delivery, routing, and queuing. Many applications use RabbitMQ for task queues, asynchronous processing, and integrating disparate systems.
  • Apache Kafka: A distributed streaming platform often used as a message broker. Kafka is designed for high-throughput, fault-tolerant data pipelines and streaming applications. It’s commonly used in real-time data analytics, log aggregation, and event sourcing architectures, such as those found in LinkedIn and Netflix.
  • Amazon SQS (Simple Queue Service): A fully managed message queuing service offered by Amazon Web Services. It allows developers to decouple application components by using message queues to coordinate workflows. SQS is often used in serverless architectures and for building scalable, distributed systems.

Specimens

15 implementations
Specimen 111.01 Dart View specimen ↗

The Message Broker pattern decouples application components by enabling them to communicate through intermediary message channels (brokers). Components, termed publishers and subscribers, don’t need to know about each other directly. Publishers send messages to the broker, and subscribers receive messages they’ve registered for. This promotes scalability, flexibility and resilience. In Dart, this is elegantly implemented using Streams and StreamControllers, which naturally handle asynchronous message passing. The StreamController acts as our broker, allowing components to broadcast and listen for events without tight coupling. This approach leverages Dart’s asynchronous-first design and stream manipulation capabilities.

// message_broker.dart
import 'dart:async';

class MessageBroker {
  final _controller = StreamController<String>();

  Stream<String> get stream => _controller.stream;

  void publish(String message) {
    _controller.add(message);
  }

  void close() {
    _controller.close();
  }
}

// publisher.dart
import 'message_broker.dart';

class Publisher {
  final MessageBroker broker;
  final String name;

  Publisher(this.broker, this.name);

  void sendMessage(String message) {
    broker.publish('$name: $message');
  }
}

// subscriber.dart
import 'dart:async';
import 'message_broker.dart';

class Subscriber {
  final String name;
  final MessageBroker broker;

  Subscriber(this.name, this.broker) {
    broker.stream.listen((message) {
      print('$name received: $message');
    });
  }
}

// main.dart
import 'message_broker.dart';
import 'publisher.dart';
import 'subscriber.dart';

void main() {
  final broker = MessageBroker();
  final publisher1 = Publisher(broker, 'News Source 1');
  final publisher2 = Publisher(broker, 'News Source 2');
  final subscriber1 = Subscriber('User A', broker);
  final subscriber2 = Subscriber('User B', broker);

  publisher1.sendMessage('Breaking news: Dart is awesome!');
  publisher2.sendMessage('Another update: Flutter is growing fast!');

  // Allow time for messages to be processed before closing
  Future.delayed(Duration(seconds: 2), () => broker.close());
}