Asynchronous Messaging Patterns

Philip Rehberger Feb 14, 2026 6 min read

Design reliable async communication. Learn competing consumers, saga pattern, and transactional outbox.

Asynchronous Messaging Patterns

Asynchronous messaging decouples services by allowing them to communicate without waiting for immediate responses. Instead of direct synchronous calls, services exchange messages through intermediaries like message queues or event buses. This pattern improves resilience, scalability, and flexibility in distributed systems.

When services communicate synchronously, they're coupled in time; both must be available simultaneously. Asynchronous messaging removes this coupling. The sender doesn't wait for the receiver. If the receiver is temporarily unavailable, messages queue until it recovers. This temporal decoupling is fundamental to building resilient systems.

Message Queues vs Event Buses

Message queues provide point-to-point communication. A message is delivered to exactly one consumer, even if multiple consumers exist. Queues are appropriate when work needs to be processed exactly once.

// Publishing to a queue
class OrderService
{
    public function createOrder(array $data): Order
    {
        $order = Order::create($data);

        // Queue work for single consumer
        Queue::push(new ProcessOrderPayment($order->id));

        return $order;
    }
}

// Consuming from queue
class ProcessOrderPayment implements ShouldQueue
{
    public function handle(): void
    {
        $order = Order::findOrFail($this->orderId);
        $this->paymentService->charge($order);
    }
}

Event buses (pub/sub) provide one-to-many communication. Multiple subscribers receive each event. Events are appropriate when multiple services need to react to the same occurrence.

// Publishing an event
class OrderService
{
    public function completeOrder(Order $order): void
    {
        $order->update(['status' => 'completed']);

        // Multiple subscribers will receive this
        Event::dispatch(new OrderCompleted($order));
    }
}

// Multiple subscribers
class SendOrderConfirmation
{
    public function handle(OrderCompleted $event): void
    {
        Mail::send(new OrderConfirmationMail($event->order));
    }
}

class UpdateInventory
{
    public function handle(OrderCompleted $event): void
    {
        foreach ($event->order->items as $item) {
            $this->inventoryService->decrease($item->sku, $item->quantity);
        }
    }
}

class NotifyWarehouse
{
    public function handle(OrderCompleted $event): void
    {
        $this->warehouseApi->createShipment($event->order);
    }
}

Message Patterns

Request-reply enables async communication that still returns a result. The requester includes a reply-to address; the responder sends the result there.

class AsyncRequestService
{
    public function request(string $queue, array $payload): string
    {
        $correlationId = Str::uuid()->toString();
        $replyQueue = "replies.{$correlationId}";

        // Create temporary reply queue
        $this->createTemporaryQueue($replyQueue);

        // Send request with reply metadata
        $this->publish($queue, [
            'payload' => $payload,
            'reply_to' => $replyQueue,
            'correlation_id' => $correlationId,
        ]);

        return $correlationId;
    }

    public function getReply(string $correlationId, int $timeout = 30): ?array
    {
        $replyQueue = "replies.{$correlationId}";

        $message = $this->consumeWithTimeout($replyQueue, $timeout);

        $this->deleteQueue($replyQueue);

        return $message;
    }
}

Competing consumers scale processing by having multiple workers consume from the same queue. The queue ensures each message goes to exactly one consumer.

// Run multiple workers for parallel processing
// Each worker processes different messages

class ProcessImageJob implements ShouldQueue
{
    public $queue = 'image-processing';

    public function handle(): void
    {
        // Multiple workers can process images in parallel
        $this->imageService->process($this->imageId);
    }
}

// supervisor.conf - run 4 workers
// [program:image-worker]
// command=php artisan queue:work --queue=image-processing
// numprocs=4

Message routing directs messages to appropriate queues based on content or metadata.

class MessageRouter
{
    private array $routes = [];

    public function route(string $pattern, string $queue): self
    {
        $this->routes[$pattern] = $queue;
        return $this;
    }

    public function dispatch(array $message): void
    {
        $type = $message['type'] ?? 'default';

        foreach ($this->routes as $pattern => $queue) {
            if (preg_match($pattern, $type)) {
                $this->publish($queue, $message);
                return;
            }
        }

        throw new UnroutableMessageException($type);
    }
}

// Usage
$router = new MessageRouter();
$router->route('/^order\./', 'orders');
$router->route('/^payment\./', 'payments');
$router->route('/^notification\./', 'notifications');

Reliability Patterns

Messages can be lost at multiple points: before reaching the broker, in the broker, or during consumer processing. Reliability patterns address each failure mode.

Publisher confirms ensure messages reach the broker:

class ReliablePublisher
{
    public function publish(string $queue, array $message): void
    {
        $channel = $this->connection->channel();
        $channel->confirm_select();

        $channel->basic_publish(
            new AMQPMessage(json_encode($message)),
            '',
            $queue
        );

        // Wait for broker acknowledgment
        $channel->wait_for_pending_acks(5.0);
    }
}

Consumer acknowledgments ensure messages aren't lost during processing:

class ReliableConsumer
{
    public function consume(string $queue, callable $handler): void
    {
        $channel = $this->connection->channel();

        $channel->basic_qos(0, 1, false);  // One message at a time

        $channel->basic_consume(
            $queue,
            '',
            false,  // no_local
            false,  // no_ack - we'll ack manually
            false,  // exclusive
            false,  // nowait
            function ($message) use ($handler, $channel) {
                try {
                    $handler(json_decode($message->body, true));

                    // Acknowledge after successful processing
                    $channel->basic_ack($message->delivery_info['delivery_tag']);

                } catch (Exception $e) {
                    // Reject and requeue on failure
                    $channel->basic_nack(
                        $message->delivery_info['delivery_tag'],
                        false,  // multiple
                        true    // requeue
                    );
                }
            }
        );
    }
}

Dead letter queues capture messages that can't be processed:

class DeadLetterHandler
{
    public function handleFailedMessage(array $message, Exception $error): void
    {
        $deadLetter = [
            'original_message' => $message,
            'error' => $error->getMessage(),
            'failed_at' => now()->toIso8601String(),
            'retry_count' => ($message['_retry_count'] ?? 0) + 1,
        ];

        $this->publish('dead-letters', $deadLetter);

        Log::error('Message moved to dead letter queue', [
            'message_id' => $message['id'] ?? null,
            'error' => $error->getMessage(),
        ]);
    }
}

Ordering and Idempotency

Message ordering isn't guaranteed in most systems. Messages may arrive out of order due to retries, parallel processing, or network delays. Design consumers to handle this.

class OrderedMessageProcessor
{
    public function process(array $message): void
    {
        $entityId = $message['entity_id'];
        $sequenceNumber = $message['sequence'];

        $lastProcessed = Cache::get("sequence:$entityId", 0);

        if ($sequenceNumber <= $lastProcessed) {
            // Already processed or out of order
            Log::info('Skipping out-of-order message', [
                'entity_id' => $entityId,
                'sequence' => $sequenceNumber,
                'last_processed' => $lastProcessed,
            ]);
            return;
        }

        // Process message
        $this->handleMessage($message);

        Cache::put("sequence:$entityId", $sequenceNumber);
    }
}

Idempotent consumers produce the same result regardless of how many times they process a message. This is essential for at-least-once delivery systems.

class IdempotentOrderProcessor implements ShouldQueue
{
    public function handle(): void
    {
        $messageId = $this->message['id'];

        // Check if already processed
        $processed = DB::table('processed_messages')
            ->where('message_id', $messageId)
            ->exists();

        if ($processed) {
            Log::info('Duplicate message skipped', ['message_id' => $messageId]);
            return;
        }

        DB::transaction(function () use ($messageId) {
            // Process the order
            $this->processOrder($this->message);

            // Mark as processed
            DB::table('processed_messages')->insert([
                'message_id' => $messageId,
                'processed_at' => now(),
            ]);
        });
    }
}

Saga Pattern

Sagas coordinate distributed transactions through a sequence of local transactions, each publishing events that trigger the next step. Compensating transactions handle failures.

class OrderSaga
{
    public function start(Order $order): void
    {
        // Step 1: Reserve inventory
        Event::dispatch(new ReserveInventoryRequested($order));
    }

    public function onInventoryReserved(InventoryReserved $event): void
    {
        // Step 2: Charge payment
        Event::dispatch(new ChargePaymentRequested($event->order));
    }

    public function onPaymentCharged(PaymentCharged $event): void
    {
        // Step 3: Complete order
        $event->order->update(['status' => 'completed']);
        Event::dispatch(new OrderCompleted($event->order));
    }

    // Compensating transactions
    public function onPaymentFailed(PaymentFailed $event): void
    {
        // Compensate: Release reserved inventory
        Event::dispatch(new ReleaseInventoryRequested($event->order));
        $event->order->update(['status' => 'payment_failed']);
    }

    public function onInventoryReservationFailed(InventoryReservationFailed $event): void
    {
        // Nothing to compensate, just fail the order
        $event->order->update(['status' => 'insufficient_inventory']);
    }
}

Monitoring and Observability

Monitor queue depth, processing time, and error rates to ensure healthy message processing.

class QueueMetrics
{
    public function collect(): array
    {
        return [
            'queue_depth' => $this->getQueueDepths(),
            'processing_rate' => $this->getProcessingRate(),
            'error_rate' => $this->getErrorRate(),
            'oldest_message_age' => $this->getOldestMessageAge(),
        ];
    }

    private function getQueueDepths(): array
    {
        $queues = ['orders', 'payments', 'notifications'];
        $depths = [];

        foreach ($queues as $queue) {
            $depths[$queue] = Queue::size($queue);
        }

        return $depths;
    }
}

Alert on growing queues (consumers can't keep up), high error rates (processing problems), and old messages (stuck processing).

Conclusion

Asynchronous messaging enables resilient, scalable distributed systems. Message queues provide reliable work distribution. Event buses enable loose coupling between services. Reliability patterns prevent message loss. Idempotent consumers handle duplicates safely.

Choose patterns based on your requirements. Point-to-point queues for work distribution. Pub/sub for event notification. Sagas for distributed transactions. The right combination of patterns creates systems that handle failures gracefully and scale with demand.

Share this article

Related Articles

Need help with your project?

Let's discuss how we can help you build reliable software.