Publish-Subscribe
The Publish-Subscribe pattern defines one-to-many dependencies between objects. A publisher (or event source) doesn’t know about its subscribers. Instead, it publishes events to a broker (or message queue), and subscribers express interest in specific events by registering with the broker. When an event occurs, the broker efficiently delivers it to all registered subscribers.
Usage
The Publish-Subscribe pattern is frequently used in scenarios requiring loose coupling and event-driven architectures. Common use cases include:
- Real-time updates: Applications needing to react immediately to changes (e.g., stock tickers, news feeds).
- Event logging and monitoring: Capturing and distributing system events for analysis and auditing.
- Decoupled microservices: Allowing services to communicate without direct dependencies.
- GUI frameworks: Notifying UI elements when underlying data changes.
- Messaging systems: Implementing asynchronous communication between applications and components.
Examples
-
Node.js
EventEmitter: Node.js’s coreEventEmitterclass implements the Publish-Subscribe pattern. Modules can emit named events, and other modules can listen for those events using theon()method. The EventEmitter acts as the broker.javascript const emitter = new EventEmitter();
// Subscriber emitter.on(‘data’, (data) => { console.log(“Received data:”, data); });
// Publisher emitter.emit(‘data’, {message: ‘Hello, world!’});
-
RxJS (Reactive Extensions for JavaScript): RxJS provides a powerful and flexible way to implement reactive programming, heavily based on the Publish-Subscribe pattern using Observables and Observers. Observables are the publishers, and Observers are the subscribers.
javascript import { Observable } from ‘rxjs’;
// Publisher (Observable) const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); });
// Subscriber (Observer) observable.subscribe( value => console.log(‘value:’, value), error => console.log(’error:’, error), () => console.log(‘completed’) );
Specimens
15 implementationsThe Publish-Subscribe (Pub/Sub) pattern decouples message senders (publishers) from message receivers (subscribers). Publishers don’t know who their subscribers are, and subscribers only know of the publishers, not how to directly interact with them. A central message broker (often called a topic or event bus) manages message delivery.
This Dart implementation uses a Subject class to act as the message broker. Publishers call notify() on the subject, providing a message. Subscribers register with the subject via a StreamSubscription to receive these messages. Dart’s Streams and StreamController are naturally suited for this pattern, providing a reactive and efficient way to manage asynchronous event handling. The use of StreamController and Stream aligns with Dart’s asynchronous programming model and promotes a clean separation of concerns.
// subject.dart
import 'dart:async';
class Subject {
final StreamController<String> _controller = StreamController<String>();
Stream<String> get stream => _controller.stream;
void notify(String message) {
_controller.sink.add(message);
}
void close() {
_controller.close();
}
}
// main.dart
import 'subject.dart';
void main() {
final subject = Subject();
// Subscriber 1
subject.stream.listen((message) {
print('Subscriber 1 received: $message');
});
// Subscriber 2
subject.stream.listen((message) {
print('Subscriber 2 received: $message');
});
// Publish messages
subject.notify('Hello, Subscribers!');
subject.notify('Another message.');
// Clean up
subject.close();
}
The Publish-Subscribe (Pub/Sub) pattern decouples message publishers from message subscribers. Publishers don’t know who their subscribers are, and subscribers only know about the channels they’re interested in, not the publishers. This promotes loose coupling and scalability. Our Scala implementation uses a Subject (or MessageBroker) which maintains a list of subscribers and is responsible for dispatching messages to them. Subscribers register with the subject to receive messages on specific topics. We leverage Scala’s case classes for message representation and immutable collections for thread-safety. The use of higher-order functions for subscription management is a functional and idiomatic approach.
// Publisher Subscriber (Pub/Sub) Pattern in Scala
// Define a simple message
case class Message(topic: String, content: String)
// Subject (Message Broker)
class Subject {
private var subscribers: Map[String, Seq[String => Unit]] = Map.empty
def subscribe(topic: String, callback: String => Unit): Unit = {
subscribers = subscribers + (topic -> (callback +: subscribers.getOrElse(topic, Seq.empty)))
}
def unsubscribe(topic: String, callback: String => Unit): Unit = {
val currentSubscribers = subscribers.getOrElse(topic, Seq.empty)
subscribers = subscribers + (topic -> currentSubscribers.filter(_ != callback))
}
def publish(message: Message): Unit = {
subscribers.get(message.topic).foreach { callbacks =>
callbacks.foreach(callback => callback(message.content))
}
}
}
// Example Usage
object PubSubExample {
def main(args: Array[String]): Unit = {
val subject = new Subject()
// Subscriber 1
subject.subscribe("news", message => println(s"Subscriber 1 received: $message"))
// Subscriber 2
subject.subscribe("news", message => println(s"Subscriber 2 received: $message"))
subject.subscribe("sports", message => println(s"Subscriber 3 received: $message"))
// Publish messages
subject.publish(Message("news", "Breaking news: Scala is awesome!"))
subject.publish(Message("sports", "The home team won!"))
// Unsubscribe Subscriber 1
subject.unsubscribe("news", message => println(s"Subscriber 1 received: $message"))
// Publish another message
subject.publish(Message("news", "Another news update."))
}
}
The Publish-Subscribe (Pub/Sub) pattern decouples message senders (publishers) from message receivers (subscribers). Publishers don’t know who their listeners are, and subscribers only know that messages of a certain type are published, not who publishes them. This promotes flexibility and scalability.
This PHP implementation utilizes spl_observer to achieve Pub/Sub. A Subject maintains a list of SplObserver subscribers and notifies them when a specific event occurs via SplSubject::notify(). Event data is passed as an argument to the update() method of each observer. This leverages PHP’s built-in observer pattern, making the code concise and readable, aligning with PHP’s emphasis on simplicity and utilizing existing language features.
<?php
/**
* Subject (Publisher) Interface
*/
interface Subject {
public function attach(SplObserver $observer): void;
public function detach(SplObserver $observer): void;
public function notify(string $event, $data = null): void;
}
/**
* Concrete Subject: Event Dispatcher
*/
class EventDispatcher implements Subject {
private $observers = [];
public function attach(SplObserver $observer): void {
$this->observers[] = $observer;
}
public function detach(SplObserver $observer): void {
$index = array_search($observer, $this->observers, true);
if ($index !== false) {
unset($this->observers[$index]);
}
}
public function notify(string $event, $data = null): void {
foreach ($this->observers as $observer) {
$observer->update($this, $event, $data);
}
}
}
/**
* Observer Interface (SplObserver already provides this)
*/
// interface Observer {
// public function update(Subject $subject, string $event, $data = null): void;
// }
/**
* Concrete Observer: Logger
*/
class Logger implements SplObserver {
public function update(SplSubject $subject, string $event, $data = null): void {
echo "Logger: Event '$event' occurred with data: " . print_r($data, true) . "\n";
}
}
/**
* Concrete Observer: Email Sender
*/
class EmailSender implements SplObserver {
public function update(SplSubject $subject, string $event, $data = null): void {
if ($event === 'user.registered') {
echo "EmailSender: Sending welcome email to " . $data['email'] . "\n";
}
}
}
// Usage
$dispatcher = new EventDispatcher();
$logger = new Logger();
$emailSender = new EmailSender();
$dispatcher->attach($logger);
$dispatcher->attach($emailSender);
$dispatcher->notify('user.registered', ['email' => 'test@example.com', 'username' => 'testuser']);
$dispatcher->notify('order.placed', ['order_id' => 123, 'total' => 100]);
$dispatcher->detach($emailSender);
$dispatcher->notify('user.registered', ['email' => 'another@example.com', 'username' => 'anotheruser']);
?>
The Publish-Subscribe (Pub/Sub) pattern decouples message senders (publishers) from message receivers (subscribers). Publishers don’t know who is receiving messages, and subscribers don’t need to know where messages come from. A central message broker (in this case, a simple Ruby hash) manages subscriptions.
This Ruby implementation uses a hash to store subscribers for each topic. Publishers call publish with a topic and message, iterating through subscribers to notify them. Subscribers register with subscribe providing a topic and a callback. The use of blocks for callbacks is idiomatic Ruby, enabling concise and flexible event handling. The topics hash acts as the central registry, and the structure promotes code readability and ease of maintenance.
class Event
attr_reader :topic, :data
def initialize(topic, data)
@topic = topic
@data = data
end
end
class PubSub
def initialize
@topics = {}
end
def subscribe(topic, &callback)
@topics[topic] ||= []
@topics[topic] << callback
end
def publish(topic, data)
@topics[topic].each do |callback|
callback.call(Event.new(topic, data))
end
end
end
# Example Usage:
pub_sub = PubSub.new
pub_sub.subscribe("user.created") do |event|
puts "User created! Topic: #{event.topic}, Data: #{event.data}"
end
pub_sub.subscribe("user.created") do |event|
puts "Sending welcome email to: #{event.data[:email]}"
end
pub_sub.subscribe("order.placed") do |event|
puts "New Order! Order ID: #{event.data[:order_id]}"
end
pub_sub.publish("user.created", { username: "john_doe", email: "john.doe@example.com" })
pub_sub.publish("order.placed", { order_id: "12345", total: 100.00 })
The Publish-Subscribe (Pub/Sub) pattern decouples message senders (publishers) and message receivers (subscribers). Publishers don’t know who their subscribers are, and subscribers only know of the publishers, not how to access their data directly. A central message broker (often called a topic or event bus) manages message distribution.
This Swift implementation utilizes a NotificationCenter—a built-in Pub/Sub mechanism—which is the idiomatic way to handle events in Apple ecosystems. Publishers use post(name:object:) to send notifications, and subscribers register observers using addObserver(forName:object:block:). The use of Notification.Name provides strong typing for notification names, enhancing safety and readability. This avoids stringly-typed notifications common in other languages and leverages Swift’s type system.
// Publisher
class DataProvider {
let dataDidChangeNotification = Notification.Name("DataDidChange")
private var _data: String = "Initial Data"
var data: String {
get { return _data }
set {
_data = newValue
NotificationCenter.default.post(name: dataDidChangeNotification, object: _data)
}
}
}
// Subscriber
class DataConsumer {
init(dataProvider: DataProvider) {
NotificationCenter.default.addObserver(
forName: dataProvider.dataDidChangeNotification,
object: nil,
block: { notification in
guard let newData = notification.object as? String else {
print("Received data notification with invalid object.")
return
}
self.updateData(newData)
}
)
}
func updateData(_ data: String) {
print("Data updated: \(data)")
}
deinit {
NotificationCenter.default.removeObserver(self)
}
}
// Usage
let dataProvider = DataProvider()
let dataConsumer1 = DataConsumer(dataProvider: dataProvider)
let dataConsumer2 = DataConsumer(dataProvider: dataProvider)
dataProvider.data = "First Update"
dataProvider.data = "Second Update"
The Publish-Subscribe (Pub/Sub) pattern decouples message publishers from message subscribers. Publishers don’t know which subscribers exist, and subscribers only know about the messages they’re interested in. A central message broker (often called an event bus or dispatcher) manages message delivery.
The Kotlin code below implements a simple Pub/Sub system using a Topic class as the message broker. Publishers publish messages to a topic, and subscribers subscribe to receive them. We use Kotlin’s functional programming capabilities with extension functions to offer a clean subscribe API. The use of MutableList and forEach for subscribers aims for simplicity in this illustrative example; in a production environment, consider thread safety and more robust collection handling. Data classes improve code conciseness.
data class Message(val topic: String, val content: Any)
class Topic {
private val subscribers: MutableList<((Message) -> Unit)> = mutableListOf()
fun subscribe(topic: String, callback: (Message) -> Unit): () -> Unit {
subscribers.add { message ->
if (message.topic == topic) {
callback(message)
}
}
return { subscribers.remove(callback) } // Return unsubscribe function
}
fun publish(message: Message) {
subscribers.forEach { it(message) }
}
}
fun Topic.subscribe(topic: String, block: (Any) -> Unit): () -> Unit {
return this.subscribe(topic) { msg -> block(msg.content) }
}
fun main() {
val topic = Topic()
// Subscriber 1
val unsubscribe1 = topic.subscribe("news") { content ->
println("Subscriber 1 received news: $content")
}
// Subscriber 2
topic.subscribe("sports") { content ->
println("Subscriber 2 received sports: $content")
}
// Subscriber 3 (handles both news and sports)
val unsubscribe3 = topic.subscribe("news") { content ->
println("Subscriber 3 received news: $content")
}
topic.subscribe("sports") { content ->
println("Subscriber 3 received sports: $content")
}
topic.publish(Message("news", "Kotlin is awesome!"))
topic.publish(Message("sports", "Team A won the championship."))
topic.publish(Message("weather", "It's sunny today.")) // No subscribers
unsubscribe1() // Stop receiving news
topic.publish(Message("news", "Another Kotlin update!")) // Only Subscriber 3 receives this
}
The Publish-Subscribe (Pub/Sub) pattern decouples message senders (publishers) from message receivers (subscribers). Publishers emit messages to a topic without knowing who, if anyone, is listening. Subscribers express interest in specific topics and receive messages published to those topics. This promotes loose coupling and scalability.
This Rust implementation uses the crossbeam-channel crate for thread-safe communication. A Publisher holds a vector of channels, one for each subscriber. Publishing sends a message to all subscriber channels. A Subscriber registers with the publisher to receive messages on a dedicated channel. The use of channels provides a concurrent and safe way to distribute messages. This approach is idiomatic Rust due to its emphasis on ownership, borrowing, and concurrency safety through channels.
use crossbeam_channel::{unbounded, Sender, Receiver};
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
struct Publisher {
subscribers: Arc<Mutex<HashMap<String, Sender<String>>>>,
}
impl Publisher {
fn new() -> Self {
Publisher {
subscribers: Arc::new(Mutex::new(HashMap::new())),
}
}
fn subscribe(&self, topic: String) -> Receiver<String> {
let (sender, receiver) = unbounded();
let mut subscribers = self.subscribers.lock().unwrap();
subscribers.insert(topic, sender);
receiver
}
fn publish(&self, topic: String, message: String) {
let subscribers = self.subscribers.lock().unwrap();
if let Some(sender) = subscribers.get(&topic) {
let _ = sender.send(message);
}
}
}
struct Subscriber {
topic: String,
receiver: Receiver<String>,
}
impl Subscriber {
fn new(topic: String, receiver: Receiver<String>) -> Self {
Subscriber { topic, receiver }
}
fn receive(&mut self) -> Option<String> {
self.receiver.recv()
}
}
fn main() {
let publisher = Publisher::new();
let receiver1 = publisher.subscribe("news".to_string());
let subscriber1 = Subscriber::new("news".to_string(), receiver1);
let receiver2 = publisher.subscribe("sports".to_string());
let subscriber2 = Subscriber::new("sports".to_string(), receiver2);
let receiver3 = publisher.subscribe("news".to_string());
let subscriber3 = Subscriber::new("news".to_string(), receiver3);
publisher.publish("news".to_string(), "Breaking news: Rust is awesome!".to_string());
publisher.publish("sports".to_string(), "Local team wins championship!".to_string());
if let Some(msg) = subscriber1.receive() {
println!("Subscriber 1 (news) received: {}", msg);
}
if let Some(msg) = subscriber2.receive() {
println!("Subscriber 2 (sports) received: {}", msg);
}
if let Some(msg) = subscriber3.receive() {
println!("Subscriber 3 (news) received: {}", msg);
}
}
The Publish-Subscribe (Pub/Sub) pattern enables decoupling of message senders (publishers) and message receivers (subscribers). Publishers emit events without knowing who, if anyone, is listening. Subscribers express interest in specific events and are notified when those events occur. This promotes loose coupling, making systems more flexible and maintainable.
The Go implementation uses goroutines and channels for asynchronous communication. A central EventBus struct holds a map of channels, where the event type is the key and the channel carries events of that type. Publishers use the Publish method to send events on the appropriate channel. Subscribers use the Subscribe method to receive events from the bus via a dedicated channel. This approach is idiomatic Go due to its concurrency primitives and emphasis on explicit communication through channels.
package main
import (
"fmt"
"time"
)
// EventBus manages subscriptions and publishes events.
type EventBus struct {
subscriptions map[string]chan interface{}
mu sync.RWMutex
}
// NewEventBus creates a new EventBus.
func NewEventBus() *EventBus {
return &EventBus{
subscriptions: make(map[string]chan interface{}),
}
}
// Subscribe registers a subscriber to an event type.
func (eb *EventBus) Subscribe(eventType string) <-chan interface{} {
eb.mu.Lock()
defer eb.mu.Unlock()
ch := make(chan interface{})
eb.subscriptions[eventType] = ch
return ch
}
// Publish sends an event to all subscribers of a given type.
func (eb *EventBus) Publish(eventType string, event interface{}) {
eb.mu.RLock()
defer eb.mu.RUnlock()
ch, ok := eb.subscriptions[eventType]
if ok {
go func() { ch <- event }() // Use goroutine to avoid blocking publishers
}
}
import "sync"
func main() {
bus := NewEventBus()
// Subscribe to "user.created" events
userCreated := bus.Subscribe("user.created")
go func() {
for event := range userCreated {
fmt.Printf("User Created Event: %v\n", event)
}
}()
// Subscribe to "order.placed" events
orderPlaced := bus.Subscribe("order.placed")
go func() {
for event := range orderPlaced {
fmt.Printf("Order Placed Event: %v\n", event)
}
}()
// Publish some events
bus.Publish("user.created", map[string]interface{}{"id": 1, "name": "Alice"})
bus.Publish("order.placed", map[string]interface{}{"order_id": "123", "user_id": 1, "total": 100.0})
bus.Publish("user.created", map[string]interface{}{"id": 2, "name": "Bob"})
time.Sleep(1 * time.Second) // Allow time for event processing
}
The Publish-Subscribe (Pub/Sub) pattern decouples message senders (publishers) from message receivers (subscribers). Publishers announce events without knowing who or how many subscribers exist. Subscribers express interest in one or more events, and only receive notifications for those they’ve subscribed to. This implementation uses a simple linked list to store subscribers for each event type (represented as strings). A central EventManager manages subscriptions and dispatches events to the appropriate subscribers. C’s function pointers are used to represent subscriber callbacks, allowing for generic event handling. This approach avoids tight coupling and promotes extensibility.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
// Define a subscriber callback function type
typedef void (*event_callback)(void *data);
// Define a structure for a subscriber
typedef struct subscriber {
event_callback callback;
void *data;
struct subscriber *next;
} subscriber_t;
// Define the event manager
typedef struct event_manager {
subscriber_t *subscribers[1024]; // Array to hold subscribers for each event
} event_manager_t;
// Create a new event manager
event_manager_t *event_manager_create() {
event_manager_t *manager = (event_manager_t *)malloc(sizeof(event_manager_t));
if (manager) {
for (int i = 0; i < 1024; i++) {
manager->subscribers[i] = NULL;
}
}
return manager;
}
// Subscribe to an event
void event_manager_subscribe(event_manager_t *manager, const char *event, event_callback callback, void *data) {
int hash = (int)strlen(event) % 1024;
subscriber_t *new_subscriber = (subscriber_t *)malloc(sizeof(subscriber_t));
if (new_subscriber) {
new_subscriber->callback = callback;
new_subscriber->data = data;
new_subscriber->next = manager->subscribers[hash];
manager->subscribers[hash] = new_subscriber;
}
}
// Unsubscribe from an event
void event_manager_unsubscribe(event_manager_t *manager, const char *event, event_callback callback) {
int hash = (int)strlen(event) % 1024;
subscriber_t *current = manager->subscribers[hash];
subscriber_t *prev = NULL;
while (current) {
if (current->callback == callback) {
if (prev) {
prev->next = current->next;
} else {
manager->subscribers[hash] = current->next;
}
free(current);
return;
}
prev = current;
current = current->next;
}
}
// Publish an event
void event_manager_publish(event_manager_t *manager, const char *event, void *data) {
int hash = (int)strlen(event) % 1024;
subscriber_t *current = manager->subscribers[hash];
while (current) {
current->callback(data);
current = current->next;
}
}
// Free the event manager and all its subscribers
void event_manager_free(event_manager_t *manager) {
for (int i = 0; i < 1024; i++) {
subscriber_t *current = manager->subscribers[i];
while (current) {
subscriber_t *next = current->next;
free(current);
current = next;
}
}
free(manager);
}
// Example usage
void log_event(void *data) {
printf("Event received: %s, Data: %s\n", (char *)data, (char *)data);
}
int main() {
event_manager_t *manager = event_manager_create();
event_manager_subscribe(manager, "user.created", log_event, "User created event");
event_manager_subscribe(manager, "user.created", log_event, "Another user created event");
event_manager_subscribe(manager, "order.placed", log_event, "Order placed event");
event_manager_publish(manager, "user.created", "User data");
event_manager_publish(manager, "order.placed", "Order details");
event_manager_free(manager);
return 0;
}
The Publish-Subscribe (Pub/Sub) pattern decouples message publishers from message subscribers. Publishers emit messages to a topic without knowing who (if anyone) is listening. Subscribers express interest in specific topics and receive notifications when new messages are published to those topics. This implementation uses function objects (functionality that acts like an object) to represent subscribers and a central EventBus to manage subscriptions and message delivery. Using std::function allows for flexible subscriber registration (member functions, lambdas, etc.). The code follows modern C++ practices by utilizing std::vector for storage and range-based for loops for iteration.
#include <iostream>
#include <vector>
#include <functional>
#include <string>
class EventBus {
public:
using Callback = std::function<void(const std::string&)>;
void subscribe(const std::string& eventType, Callback callback) {
subscribers_[eventType].push_back(callback);
}
void unsubscribe(const std::string& eventType, Callback callback) {
auto& subs = subscribers_[eventType];
subs.erase(std::remove_if(subs.begin(), subs.end(),
[&](const Callback& sub) { return sub.target<void(const std::string&)>() == callback.target<void(const std::string&)>(); }),
subs.end());
}
void publish(const std::string& eventType, const std::string& data) {
for (const auto& callback : subscribers_[eventType]) {
callback(data);
}
}
private:
std::unordered_map<std::string, std::vector<Callback>> subscribers_;
};
// Example Usage
class MySubscriber {
public:
void onEvent(const std::string& data) {
std::cout << "MySubscriber received event: " << data << std::endl;
}
};
int main() {
EventBus eventBus;
MySubscriber subscriber;
// Subscribe using a member function pointer
eventBus.subscribe("user.created", [&](const std::string& data) {
std::cout << "Lambda Subscriber received event: " << data << std::endl;
});
eventBus.subscribe("user.created", std::bind(&MySubscriber::onEvent, &subscriber, std::placeholders::_1));
// Publish events
eventBus.publish("user.created", "New user: Alice");
eventBus.publish("user.updated", "User profile updated");
return 0;
}
The Publish-Subscribe (Pub/Sub) pattern decouples message publishers from subscribers. Publishers don’t know who their subscribers are, and subscribers only know about the publishers through a central message broker (in this case, a simple event aggregator). This promotes loose coupling and extensibility.
This C# implementation uses the event keyword and delegates to create a basic event system. The EventAggregator class holds a list of subscribers for each event type. Publishers use the Publish method to trigger events, and subscribers Subscribe to events they are interested in. This aligns with C#’s event-driven programming model and offers strong type safety. Using event simplifies event handling, making it concise and readable.
// EventAggregator.cs
using System;
using System.Collections.Generic;
public class EventAggregator
{
private readonly Dictionary<string, List<Delegate>> _eventSubscribers = new();
public void Subscribe<TEvent>(string eventName, Action<TEvent> subscriber)
{
_eventSubscribers.TryGetValue(eventName, out var subscribers);
subscribers ??= new List<Delegate>();
subscribers.Add(subscriber);
_eventSubscribers[eventName] = subscribers;
}
public void Publish<TEvent>(string eventName, TEvent args)
{
if (_eventSubscribers.TryGetValue(eventName, out var subscribers))
{
foreach (var subscriber in subscribers)
{
((Action<TEvent>)subscriber)(args);
}
}
}
}
// Example Usage
// Subscriber.cs
public class OrderService
{
public void HandleOrderCreated(OrderCreatedEvent args)
{
Console.WriteLine($"Order created: {args.OrderId}, Customer: {args.CustomerName}");
}
}
// Publisher.cs
public class OrderProcessor
{
private readonly EventAggregator _eventAggregator;
private readonly OrderService _orderService;
public OrderProcessor(EventAggregator eventAggregator, OrderService orderService)
{
_eventAggregator = eventAggregator;
_orderService = orderService;
}
public void CreateOrder(int orderId, string customerName)
{
// Process order...
Console.WriteLine($"Processing order {orderId} for {customerName}");
// Publish the event
_eventAggregator.Publish("OrderCreated", new OrderCreatedEvent(orderId, customerName));
}
}
// Event Data
public class OrderCreatedEvent
{
public int OrderId { get; }
public string CustomerName { get; }
public OrderCreatedEvent(int orderId, string customerName)
{
OrderId = orderId;
CustomerName = customerName;
}
}
// Program.cs
public class Program
{
public static void Main(string[] args)
{
var eventAggregator = new EventAggregator();
var orderService = new OrderService();
var orderProcessor = new OrderProcessor(eventAggregator, orderService);
// Subscribe to the event
eventAggregator.Subscribe("OrderCreated", orderService.HandleOrderCreated);
// Create an order
orderProcessor.CreateOrder(123, "Alice");
}
}
The Publish-Subscribe (PubSub) pattern decouples message publishers from message subscribers. Publishers (or topics) don’t know who their subscribers are, and subscribers only know about topics they’re interested in, not who is publishing. This allows for loose coupling and scalability.
The code implements a simple PubSub system using a Subject class. This subject maintains a list of subscribers (callbacks) associated with each topic (string). publish() adds a message to all subscribers of a topic, and subscribe() and unsubscribe() manage the subscriber list. TypeScript’s type safety is leveraged through the use of function type definitions for subscribers, improving code reliability. This is fitting for TypeScript by promoting strong typing for event handlers.
class PubSub {
private subscribers: { [topic: string]: Function[] } = {};
subscribe(topic: string, subscriber: Function): void {
if (!this.subscribers[topic]) {
this.subscribers[topic] = [];
}
this.subscribers[topic].push(subscriber);
}
unsubscribe(topic: string, subscriber: Function): void {
this.subscribers[topic] = this.subscribers[topic]?.filter(
(sub) => sub !== subscriber
) || [];
}
publish(topic: string, data: any): void {
if (this.subscribers[topic]) {
this.subscribers[topic].forEach((subscriber) => subscriber(data));
}
}
}
// Example Usage
const pubSub = new PubSub();
const newsFeedSubscriber = (news: string) => {
console.log("News Feed:", news);
};
const weatherSubscriber = (weather: string) => {
console.log("Weather Update:", weather);
};
pubSub.subscribe("news", newsFeedSubscriber);
pubSub.subscribe("weather", weatherSubscriber);
pubSub.publish("news", "Breaking: TypeScript 4.9 released!");
pubSub.publish("weather", "Sunny with a high of 75°F.");
pubSub.unsubscribe("news", newsFeedSubscriber);
pubSub.publish("news", "Another news item"); // Will not log.
The Publish-Subscribe (Pub/Sub) pattern enables decoupling of message producers (publishers) from message consumers (subscribers). Publishers emit events without knowing who, if anyone, is listening. Subscribers express interest in specific events and receive notifications when those events occur. This promotes loose coupling and scalability.
This JavaScript implementation uses a simple object to manage topics and subscriptions. Publishers call publish with a topic and data. Subscribers call subscribe with a topic and a callback function. The publish function iterates through the subscribers for the given topic and invokes their callbacks with the provided data. This approach is idiomatic JavaScript due to its reliance on first-class functions and flexible object structure, avoiding the need for complex class hierarchies.
class PubSub {
constructor() {
this.topics = {};
}
subscribe(topic, callback) {
if (!this.topics[topic]) {
this.topics[topic] = [];
}
this.topics[topic].push(callback);
return () => { // Return an unsubscribe function
this.topics[topic] = this.topics[topic].filter(cb => cb !== callback);
};
}
publish(topic, data) {
if (!this.topics[topic] || this.topics[topic].length === 0) {
return;
}
this.topics[topic].forEach(callback => {
callback(data);
});
}
}
// Example Usage
const pubSub = new PubSub();
const logData = (data) => {
console.log('Log:', data);
};
const alertData = (data) => {
alert('Alert: ' + data);
};
const unsubscribeLog = pubSub.subscribe('data', logData);
pubSub.subscribe('data', alertData);
pubSub.publish('data', 'Hello, world!');
unsubscribeLog(); // Stop logging
pubSub.publish('data', 'Another message!'); // Only alerts will be shown
The Publish-Subscribe (Pub/Sub) pattern decouples message senders (publishers) from message receivers (subscribers). Publishers don’t know who their subscribers are, and subscribers only know about certain types of messages, not who is sending them. This is achieved through a message broker (often called a topic or channel). This implementation uses a dictionary to store subscribers for each topic. Publishers call publish() with a topic and message, which iterates through the subscribers and calls their update methods. This approach is Pythonic due to its use of dictionaries for flexible data storage and the reliance on duck typing – subscribers are expected to have an update method, regardless of their class.
class Publisher:
def __init__(self):
self.subscribers = {}
def subscribe(self, topic, subscriber):
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(subscriber)
def unsubscribe(self, topic, subscriber):
if topic in self.subscribers:
try:
self.subscribers[topic].remove(subscriber)
except ValueError:
pass # Subscriber not found in this topic
def publish(self, topic, message):
if topic in self.subscribers:
for subscriber in self.subscribers[topic]:
subscriber.update(message)
class Subscriber:
def __init__(self, name):
self.name = name
def update(self, message):
print(f"{self.name} received: {message}")
if __name__ == "__main__":
publisher = Publisher()
subscriber1 = Subscriber("Subscriber 1")
subscriber2 = Subscriber("Subscriber 2")
subscriber3 = Subscriber("Subscriber 3")
publisher.subscribe("news", subscriber1)
publisher.subscribe("news", subscriber2)
publisher.subscribe("sports", subscriber3)
publisher.publish("news", "Breaking news: Python 3.13 released!")
publisher.publish("sports", "Local team wins championship!")
publisher.publish("weather", "Sunny today") # No subscribers for this topic
publisher.unsubscribe("news", subscriber1)
publisher.publish("news", "Another news item") # Only subscriber2 receives
The Publish-Subscribe (Pub/Sub) pattern decouples message senders (publishers) from message receivers (subscribers). Publishers emit messages to a topic without knowing who, if anyone, is listening. Subscribers express interest in specific topics and receive messages published to those topics. This promotes loose coupling and scalability.
This Java implementation uses the java.util.Observable and java.util.Observer classes. Observable acts as the message broker (subject), and Observer represents the subscribers. Publishers call notifyObservers() with the message, and Observers’ update() method is invoked if they’ve registered interest via subscribe(). This approach is idiomatic Java for simple Pub/Sub scenarios, leveraging built-in classes for event management. More complex implementations might use libraries like Guava EventBus or reactive frameworks.
import java.util.Observable;
import java.util.Observer;
import java.util.ArrayList;
import java.util.List;
// Observable (Subject/Message Broker)
class EventManager extends Observable {
private List<Observer> observers = new ArrayList<>();
public void subscribe(Observer observer) {
if (!observers.contains(observer)) {
addObserver(observer);
observers.add(observer);
}
}
public void unsubscribe(Observer observer) {
observers.remove(observer);
deleteObserver(observer);
}
public void publish(String message) {
setChanged(); // Indicate a change of state
notifyObservers(message); // Notify all observers
}
}
// Observer (Subscriber)
interface EventListener extends Observer {
void onEvent(String message);
}
class ConcreteListener implements EventListener {
private String listenerName;
public ConcreteListener(String name) {
this.listenerName = name;
}
@Override
public void update(Observable o, Object arg) {
onEvent((String) arg);
}
@Override
public void onEvent(String message) {
System.out.println(listenerName + " received: " + message);
}
}
// Publisher
public class PubSubExample {
public static void main(String[] args) {
EventManager eventManager = new EventManager();
ConcreteListener listener1 = new ConcreteListener("Listener 1");
ConcreteListener listener2 = new ConcreteListener("Listener 2");
eventManager.subscribe(listener1);
eventManager.subscribe(listener2);
eventManager.publish("Hello, Pub/Sub!");
eventManager.publish("Another event!");
eventManager.unsubscribe(listener2);
eventManager.publish("Event after unsubscribe.");
}
}