CODESAMPLE

Cluster-based Architecture - C

Share on:

The Cluster-based Architecture pattern involves grouping similar components (clusters) to handle tasks, enhancing modularity and potential for parallelism. This example simulates a worker pool using a cluster of worker threads. A central task queue holds work items, and worker threads pull from the queue and process them. It provides a degree of concurrency without requiring complex thread management at the caller level. This structure mirrors a distributed system where clusters of servers handle related requests. C’s efficient memory management and direct thread control make it suitable for implementing such a low-level architecture. The use of a shared queue and mutex ensures thread-safe access to tasks.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>

#define NUM_WORKERS 4
#define QUEUE_SIZE 10

typedef struct {
  int id;
  char* data;
} WorkItem;

WorkItem work_queue[QUEUE_SIZE];
int queue_head = 0;
int queue_tail = 0;
pthread_mutex_t queue_mutex;

void* worker_thread(void* arg) {
  int worker_id = *(int*)arg;
  WorkItem* item;

  while (1) {
    pthread_mutex_lock(&queue_mutex);
    if (queue_head != queue_tail) {
      item = &work_queue[queue_head++];
      pthread_mutex_unlock(&queue_mutex);

      printf("Worker %d processing item %d: %s\n", worker_id, item->id, item->data);
      free(item->data); // Free allocated data
      item->data = NULL;
    } else {
      pthread_mutex_unlock(&queue_mutex);
      // Queue is empty.  Sleep to avoid busy-waiting.  Better would be a condition variable.
      usleep(10000); // 10ms
    }
  }

  return NULL;
}


int main() {
  pthread_t workers[NUM_WORKERS];
  int worker_ids[NUM_WORKERS];

  pthread_mutex_init(&queue_mutex, NULL);

  // Create worker threads
  for (int i = 0; i < NUM_WORKERS; i++) {
    worker_ids[i] = i;
    if (pthread_create(&workers[i], NULL, worker_thread, &worker_ids[i]) != 0) {
      perror("Thread creation failed");
      return 1;
    }
  }

  // Add work to the queue
  for (int i = 0; i < 20; i++) {
    char* data = (char*)malloc(50);
    snprintf(data, 50, "Task %d data", i);

    pthread_mutex_lock(&queue_mutex);
    if ((queue_tail + 1) % QUEUE_SIZE == queue_head) {
      pthread_mutex_unlock(&queue_mutex);
      printf("Queue is full, dropping task %d\n", i);
      free(data);
      continue;
    }
    work_queue[queue_tail].id = i;
    work_queue[queue_tail].data = data;
    queue_tail = (queue_tail + 1) % QUEUE_SIZE;
    pthread_mutex_unlock(&queue_mutex);
    usleep(50000); // Add a small delay to simulate work creation rate
  }

  // Allow workers to finish (in a real system, you'd have a proper shutdown mechanism)
  sleep(5);


  pthread_mutex_destroy(&queue_mutex);
  return 0;
}