Asynchronous messaging decouples services by allowing them to communicate without waiting for immediate responses. Instead of direct synchronous calls, services exchange messages through intermediaries like message queues or event buses. This pattern improves resilience, scalability, and flexibility in distributed systems.
When services communicate synchronously, they're coupled in time; both must be available simultaneously. Asynchronous messaging removes this coupling. The sender doesn't wait for the receiver. If the receiver is temporarily unavailable, messages queue until it recovers. This temporal decoupling is fundamental to building resilient systems.
Message Queues vs Event Buses
Message queues provide point-to-point communication. A message is delivered to exactly one consumer, even if multiple consumers exist. Queues are appropriate when work needs to be processed exactly once.
// Publishing to a queue
class OrderService
{
public function createOrder(array $data): Order
{
$order = Order::create($data);
// Queue work for single consumer
Queue::push(new ProcessOrderPayment($order->id));
return $order;
}
}
// Consuming from queue
class ProcessOrderPayment implements ShouldQueue
{
public function handle(): void
{
$order = Order::findOrFail($this->orderId);
$this->paymentService->charge($order);
}
}
Event buses (pub/sub) provide one-to-many communication. Multiple subscribers receive each event. Events are appropriate when multiple services need to react to the same occurrence.
// Publishing an event
class OrderService
{
public function completeOrder(Order $order): void
{
$order->update(['status' => 'completed']);
// Multiple subscribers will receive this
Event::dispatch(new OrderCompleted($order));
}
}
// Multiple subscribers
class SendOrderConfirmation
{
public function handle(OrderCompleted $event): void
{
Mail::send(new OrderConfirmationMail($event->order));
}
}
class UpdateInventory
{
public function handle(OrderCompleted $event): void
{
foreach ($event->order->items as $item) {
$this->inventoryService->decrease($item->sku, $item->quantity);
}
}
}
class NotifyWarehouse
{
public function handle(OrderCompleted $event): void
{
$this->warehouseApi->createShipment($event->order);
}
}
Message Patterns
Request-reply enables async communication that still returns a result. The requester includes a reply-to address; the responder sends the result there.
class AsyncRequestService
{
public function request(string $queue, array $payload): string
{
$correlationId = Str::uuid()->toString();
$replyQueue = "replies.{$correlationId}";
// Create temporary reply queue
$this->createTemporaryQueue($replyQueue);
// Send request with reply metadata
$this->publish($queue, [
'payload' => $payload,
'reply_to' => $replyQueue,
'correlation_id' => $correlationId,
]);
return $correlationId;
}
public function getReply(string $correlationId, int $timeout = 30): ?array
{
$replyQueue = "replies.{$correlationId}";
$message = $this->consumeWithTimeout($replyQueue, $timeout);
$this->deleteQueue($replyQueue);
return $message;
}
}
Competing consumers scale processing by having multiple workers consume from the same queue. The queue ensures each message goes to exactly one consumer.
// Run multiple workers for parallel processing
// Each worker processes different messages
class ProcessImageJob implements ShouldQueue
{
public $queue = 'image-processing';
public function handle(): void
{
// Multiple workers can process images in parallel
$this->imageService->process($this->imageId);
}
}
// supervisor.conf - run 4 workers
// [program:image-worker]
// command=php artisan queue:work --queue=image-processing
// numprocs=4
Message routing directs messages to appropriate queues based on content or metadata.
class MessageRouter
{
private array $routes = [];
public function route(string $pattern, string $queue): self
{
$this->routes[$pattern] = $queue;
return $this;
}
public function dispatch(array $message): void
{
$type = $message['type'] ?? 'default';
foreach ($this->routes as $pattern => $queue) {
if (preg_match($pattern, $type)) {
$this->publish($queue, $message);
return;
}
}
throw new UnroutableMessageException($type);
}
}
// Usage
$router = new MessageRouter();
$router->route('/^order\./', 'orders');
$router->route('/^payment\./', 'payments');
$router->route('/^notification\./', 'notifications');
Reliability Patterns
Messages can be lost at multiple points: before reaching the broker, in the broker, or during consumer processing. Reliability patterns address each failure mode.
Publisher confirms ensure messages reach the broker:
class ReliablePublisher
{
public function publish(string $queue, array $message): void
{
$channel = $this->connection->channel();
$channel->confirm_select();
$channel->basic_publish(
new AMQPMessage(json_encode($message)),
'',
$queue
);
// Wait for broker acknowledgment
$channel->wait_for_pending_acks(5.0);
}
}
Consumer acknowledgments ensure messages aren't lost during processing:
class ReliableConsumer
{
public function consume(string $queue, callable $handler): void
{
$channel = $this->connection->channel();
$channel->basic_qos(0, 1, false); // One message at a time
$channel->basic_consume(
$queue,
'',
false, // no_local
false, // no_ack - we'll ack manually
false, // exclusive
false, // nowait
function ($message) use ($handler, $channel) {
try {
$handler(json_decode($message->body, true));
// Acknowledge after successful processing
$channel->basic_ack($message->delivery_info['delivery_tag']);
} catch (Exception $e) {
// Reject and requeue on failure
$channel->basic_nack(
$message->delivery_info['delivery_tag'],
false, // multiple
true // requeue
);
}
}
);
}
}
Dead letter queues capture messages that can't be processed:
class DeadLetterHandler
{
public function handleFailedMessage(array $message, Exception $error): void
{
$deadLetter = [
'original_message' => $message,
'error' => $error->getMessage(),
'failed_at' => now()->toIso8601String(),
'retry_count' => ($message['_retry_count'] ?? 0) + 1,
];
$this->publish('dead-letters', $deadLetter);
Log::error('Message moved to dead letter queue', [
'message_id' => $message['id'] ?? null,
'error' => $error->getMessage(),
]);
}
}
Ordering and Idempotency
Message ordering isn't guaranteed in most systems. Messages may arrive out of order due to retries, parallel processing, or network delays. Design consumers to handle this.
class OrderedMessageProcessor
{
public function process(array $message): void
{
$entityId = $message['entity_id'];
$sequenceNumber = $message['sequence'];
$lastProcessed = Cache::get("sequence:$entityId", 0);
if ($sequenceNumber <= $lastProcessed) {
// Already processed or out of order
Log::info('Skipping out-of-order message', [
'entity_id' => $entityId,
'sequence' => $sequenceNumber,
'last_processed' => $lastProcessed,
]);
return;
}
// Process message
$this->handleMessage($message);
Cache::put("sequence:$entityId", $sequenceNumber);
}
}
Idempotent consumers produce the same result regardless of how many times they process a message. This is essential for at-least-once delivery systems.
class IdempotentOrderProcessor implements ShouldQueue
{
public function handle(): void
{
$messageId = $this->message['id'];
// Check if already processed
$processed = DB::table('processed_messages')
->where('message_id', $messageId)
->exists();
if ($processed) {
Log::info('Duplicate message skipped', ['message_id' => $messageId]);
return;
}
DB::transaction(function () use ($messageId) {
// Process the order
$this->processOrder($this->message);
// Mark as processed
DB::table('processed_messages')->insert([
'message_id' => $messageId,
'processed_at' => now(),
]);
});
}
}
Saga Pattern
Sagas coordinate distributed transactions through a sequence of local transactions, each publishing events that trigger the next step. Compensating transactions handle failures.
class OrderSaga
{
public function start(Order $order): void
{
// Step 1: Reserve inventory
Event::dispatch(new ReserveInventoryRequested($order));
}
public function onInventoryReserved(InventoryReserved $event): void
{
// Step 2: Charge payment
Event::dispatch(new ChargePaymentRequested($event->order));
}
public function onPaymentCharged(PaymentCharged $event): void
{
// Step 3: Complete order
$event->order->update(['status' => 'completed']);
Event::dispatch(new OrderCompleted($event->order));
}
// Compensating transactions
public function onPaymentFailed(PaymentFailed $event): void
{
// Compensate: Release reserved inventory
Event::dispatch(new ReleaseInventoryRequested($event->order));
$event->order->update(['status' => 'payment_failed']);
}
public function onInventoryReservationFailed(InventoryReservationFailed $event): void
{
// Nothing to compensate, just fail the order
$event->order->update(['status' => 'insufficient_inventory']);
}
}
Monitoring and Observability
Monitor queue depth, processing time, and error rates to ensure healthy message processing.
class QueueMetrics
{
public function collect(): array
{
return [
'queue_depth' => $this->getQueueDepths(),
'processing_rate' => $this->getProcessingRate(),
'error_rate' => $this->getErrorRate(),
'oldest_message_age' => $this->getOldestMessageAge(),
];
}
private function getQueueDepths(): array
{
$queues = ['orders', 'payments', 'notifications'];
$depths = [];
foreach ($queues as $queue) {
$depths[$queue] = Queue::size($queue);
}
return $depths;
}
}
Alert on growing queues (consumers can't keep up), high error rates (processing problems), and old messages (stuck processing).
Conclusion
Asynchronous messaging enables resilient, scalable distributed systems. Message queues provide reliable work distribution. Event buses enable loose coupling between services. Reliability patterns prevent message loss. Idempotent consumers handle duplicates safely.
Choose patterns based on your requirements. Point-to-point queues for work distribution. Pub/sub for event notification. Sagas for distributed transactions. The right combination of patterns creates systems that handle failures gracefully and scale with demand.