Copied!
Programming
Laravel
PHP
Redis

Laravel Queues in High-Load Applications – Architecture for 1M+ Jobs/Day

Laravel Queues in High-Load Applications – Architecture for 1M+ Jobs/Day
Shahroz Javed
Apr 04, 2026 . 39 views

Designing Queue Topology for High Load

Queue topology — which queues exist, what jobs go in each, how many workers consume each — is the architectural decision that determines whether your system holds up under load or falls apart. Poor topology means a surge of low-priority jobs starves critical ones; good topology gives you isolation, prioritization, and independent scaling.

The core design principle: jobs that must not affect each other should not share a queue. A video encoding job that takes 5 minutes should never share a queue with a payment confirmation job that should complete in under a second. If they share a queue, a burst of video jobs creates a multi-minute delay for payment confirmations.

// config/horizon.php — production topology for a media platform
'environments' => [
    'production' => [

        // Critical path: payments, auth, security — SLA < 2 seconds
        'supervisor-critical' => [
            'queue'        => ['critical'],
            'balance'      => 'simple',
            'maxProcesses' => 5,
            'minProcesses' => 3,   // always warm — never scaled to zero
            'timeout'      => 30,
            'tries'        => 3,
        ],

        // User-facing real-time: notifications, webhooks — SLA < 10 seconds
        'supervisor-realtime' => [
            'queue'        => ['webhooks', 'notifications-push', 'notifications-email'],
            'balance'      => 'auto',
            'maxProcesses' => 20,
            'minProcesses' => 2,
            'timeout'      => 60,
        ],

        // Media processing: CPU-intensive, long-running — SLA: minutes
        'supervisor-media' => [
            'queue'        => ['video-transcode', 'image-process'],
            'balance'      => 'simple',
            'maxProcesses' => 8,   // GPU/CPU limited — don't over-provision
            'minProcesses' => 1,
            'timeout'      => 900, // 15 minutes
            'memory'       => 1024,
        ],

        // Batch/analytics: low priority, runs in background — SLA: hours
        'supervisor-batch' => [
            'queue'        => ['analytics', 'reporting', 'data-export'],
            'balance'      => 'auto',
            'maxProcesses' => 4,
            'minProcesses' => 0,   // can scale to zero during peak
            'timeout'      => 3600,
        ],

        // External integrations: rate-limited per service
        'supervisor-integrations' => [
            'queue'        => ['stripe', 'salesforce', 'mailchimp'],
            'balance'      => 'auto',
            'maxProcesses' => 6,
            'minProcesses' => 1,
            'timeout'      => 120,
        ],
    ],
],

Video & Image Processing Pipeline

A production video processing pipeline must handle: upload completion detection, multiple output quality variants, CDN distribution, thumbnail generation, and failure recovery — all as a choreographed sequence of jobs rather than one monolithic job.

// Pipeline architecture: S3 event → chain of specialized jobs
// Triggered by S3 ObjectCreated event via SQS or directly from upload controller

class VideoUploadedJob implements ShouldQueue
{
    public int $tries   = 3;
    public int $timeout = 60;

    public function __construct(
        private readonly int    $videoId,
        private readonly string $s3Key,     // e.g., "uploads/raw/uuid.mp4"
    ) {}

    public function handle(VideoRepository $videos, S3Client $s3): void
    {
        $video = $videos->findOrFail($this->videoId);

        // Validate the upload before starting expensive processing
        $metadata = $s3->getObjectMetadata($this->s3Key);

        if ($metadata['size'] > 5_368_709_120) { // 5 GB limit
            $video->update(['status' => 'rejected', 'rejection_reason' => 'file_too_large']);
            return;
        }

        $video->update(['status' => 'processing', 'raw_s3_key' => $this->s3Key]);

        // Fan-out to parallel quality transcodes
        // Each quality level is independent — they can run concurrently
        $transcodingJobs = collect(['360p', '720p', '1080p', '4k'])
            ->filter(fn ($quality) => $this->qualityNeeded($metadata, $quality))
            ->map(fn ($quality) => new TranscodeVideoJob($this->videoId, $this->s3Key, $quality));

        // After ALL transcodes complete, run the chain for post-processing
        Bus::batch($transcodingJobs->toArray())
            ->then(new GenerateThumbnailsJob($this->videoId))
            ->then(new InvalidateCdnCacheJob($this->videoId))
            ->then(new MarkVideoReadyJob($this->videoId))
            ->then(new NotifySubscribersJob($this->videoId))
            ->catch(new HandleVideoProcessingFailureJob($this->videoId))
            ->onQueue('video-transcode')
            ->dispatch();
    }

    private function qualityNeeded(array $metadata, string $quality): bool
    {
        $minimumHeights = ['360p' => 360, '720p' => 720, '1080p' => 1080, '4k' => 2160];
        return ($metadata['height'] ?? 0) >= $minimumHeights[$quality];
    }
}

// The actual transcode job — shells out to ffmpeg or calls a transcoding service
class TranscodeVideoJob implements ShouldQueue
{
    public int $timeout = 900; // 15 minutes for long videos

    public function __construct(
        private readonly int    $videoId,
        private readonly string $sourceKey,
        private readonly string $quality,
    ) {}

    public function handle(VideoTranscoder $transcoder): void
    {
        $outputKey = "processed/{$this->videoId}/{$this->quality}.mp4";

        $transcoder->transcode(
            source:  $this->sourceKey,
            output:  $outputKey,
            quality: $this->quality,
        );

        // Record the output for this quality level
        VideoVariant::create([
            'video_id' => $this->videoId,
            'quality'  => $this->quality,
            's3_key'   => $outputKey,
            'cdn_url'  => "https://cdn.yourapp.com/{$outputKey}",
        ]);
    }
}
// Image processing pipeline — simpler but same principles
class ProcessUploadedImageJob implements ShouldQueue
{
    public int $timeout = 120;

    public function __construct(
        private readonly int    $imageId,
        private readonly string $s3Key,
    ) {}

    public function handle(ImageProcessor $processor): void
    {
        // Generate all size variants in parallel via batching
        $sizes = [
            ['name' => 'thumbnail',  'width' => 150,  'height' => 150,  'fit' => 'crop'],
            ['name' => 'preview',    'width' => 800,  'height' => 600,  'fit' => 'contain'],
            ['name' => 'full',       'width' => 1920, 'height' => 1080, 'fit' => 'max'],
            ['name' => 'webp',       'width' => null, 'height' => null, 'format' => 'webp'],
        ];

        Bus::batch(
            collect($sizes)->map(fn ($size) => new ResizeImageJob($this->imageId, $this->s3Key, $size))
        )
        ->then(new MarkImageReadyJob($this->imageId))
        ->onQueue('image-process')
        ->dispatch();
    }
}

Bulk Notification Fan-Out Architecture

Sending 1 million push notifications is not a single job — it's a fan-out tree. A single coordinator job reads the audience, splits it into chunks, and dispatches per-chunk delivery jobs that are further specialized by delivery channel. This architecture allows independent failure handling per channel and per chunk.

// Fan-out pattern: coordinator → channel dispatchers → per-user senders
// Level 1: Campaign coordinator — dispatches per-channel jobs
class DispatchNotificationCampaignJob implements ShouldQueue
{
    public int $timeout = 300;

    public function __construct(private readonly int $campaignId) {}

    public function handle(): void
    {
        $campaign = NotificationCampaign::findOrFail($this->campaignId);

        $campaign->update(['status' => 'dispatching', 'dispatch_started_at' => now()]);

        // Stream through the audience without loading all IDs into memory
        // Use chunks of 1000 to balance dispatch overhead vs memory usage
        $chunkIndex = 0;
        $campaign->audience()
            ->select('user_id')
            ->cursor()
            ->chunk(1000)
            ->each(function ($chunk) use ($campaign, &$chunkIndex) {
                $userIds = $chunk->pluck('user_id')->toArray();

                // Dispatch per-channel jobs for this chunk
                if ($campaign->channels->contains('push')) {
                    SendPushNotificationBatchJob::dispatch($campaign->id, $userIds)
                        ->onQueue('notifications-push');
                }

                if ($campaign->channels->contains('email')) {
                    SendEmailBatchJob::dispatch($campaign->id, $userIds)
                        ->delay(now()->addSeconds($chunkIndex * 2)) // stagger email batches
                        ->onQueue('notifications-email');
                }

                if ($campaign->channels->contains('sms')) {
                    SendSmsBatchJob::dispatch($campaign->id, $userIds)
                        ->delay(now()->addSeconds($chunkIndex * 5)) // SMS has tightest rate limits
                        ->onQueue('notifications-sms');
                }

                $chunkIndex++;
            });

        $campaign->update(['status' => 'dispatched']);
    }
}

// Level 2: Per-channel batch sender
class SendPushNotificationBatchJob implements ShouldQueue
{
    public int $timeout = 120;

    public function __construct(
        private readonly int   $campaignId,
        private readonly array $userIds,
    ) {}

    public function handle(PushNotificationService $push): void
    {
        $campaign = NotificationCampaign::find($this->campaignId);
        if (! $campaign || $campaign->status === 'cancelled') {
            return; // campaign was cancelled mid-dispatch
        }

        // Fetch device tokens for this batch
        $tokens = PushToken::whereIn('user_id', $this->userIds)
            ->where('platform', 'fcm')
            ->whereNotNull('token')
            ->get();

        // Use FCM multicast — sends to up to 500 tokens in one API call
        $tokens->chunk(500)->each(function ($batch) use ($campaign, $push) {
            $result = $push->sendMulticast(
                tokens:  $batch->pluck('token')->toArray(),
                title:   $campaign->title,
                body:    $campaign->body,
                data:    $campaign->metadata,
            );

            // Track delivery stats
            NotificationDelivery::insert(
                $result->map(fn ($r) => [
                    'campaign_id' => $this->campaignId,
                    'token_id'    => $r->token_id,
                    'status'      => $r->success ? 'delivered' : 'failed',
                    'error_code'  => $r->errorCode,
                    'sent_at'     => now(),
                ])->toArray()
            );
        });
    }
}

API Rate Limiting via Queue Workers

When you integrate with third-party APIs that have rate limits (Stripe: 100 req/s, Twilio: 1 req/s per account, Mailchimp: 10 simultaneous calls), the queue becomes your rate-limiting layer. The pattern: dedicate a queue per API, dedicate a fixed number of workers per queue (to cap concurrency), and use Laravel's RateLimiter for within-worker throttling.

// Per-service queue architecture with worker count as concurrency control
// config/horizon.php (relevant supervisors)

'supervisor-stripe' => [
    'queue'        => ['stripe'],
    'balance'      => 'simple',    // DON'T use auto-balance — fixed concurrency is intentional
    'maxProcesses' => 10,          // Stripe allows ~100 req/s; 10 workers * 10 jobs/s each = 100 req/s max
    'minProcesses' => 10,          // hold steady, don't scale down
    'timeout'      => 30,
],

'supervisor-twilio' => [
    'queue'        => ['twilio'],
    'balance'      => 'simple',
    'maxProcesses' => 1,           // Twilio: 1 concurrent request per account number
    'minProcesses' => 1,
    'timeout'      => 15,
],

// Token bucket rate limiter for fine-grained control within workers:
class SendSmsViaStripeJob implements ShouldQueue
{
    public function middleware(): array
    {
        return [
            // Allow max 1 SMS per second using RateLimiter
            new RateLimited('twilio-sms'),
        ];
    }
}

// Register the limiter in AppServiceProvider:
RateLimiter::for('twilio-sms', function (object $job) {
    return Limit::perSecond(1); // 1 SMS per second across all workers
});
// Token bucket implementation for APIs that need burst capacity
// Allows bursting up to N requests but sustains at a lower average rate
class TokenBucketRateLimiter
{
    private const BUCKET_KEY    = 'rate_limit:bucket:';
    private const TOKENS_KEY    = ':tokens';
    private const LAST_FILL_KEY = ':last_fill';

    public function __construct(
        private readonly string $apiName,
        private readonly int    $capacity,    // max burst
        private readonly int    $fillRate,    // tokens added per second
    ) {}

    public function consume(int $tokens = 1): bool
    {
        $key = self::BUCKET_KEY . $this->apiName;

        return Cache::lock($key . ':lock', 1)->block(0.1, function () use ($key, $tokens) {
            $now      = microtime(true);
            $current  = (float) Cache::get($key . self::TOKENS_KEY, $this->capacity);
            $lastFill = (float) Cache::get($key . self::LAST_FILL_KEY, $now);

            // Refill tokens based on elapsed time
            $elapsed  = $now - $lastFill;
            $refilled = min($this->capacity, $current + ($elapsed * $this->fillRate));

            if ($refilled < $tokens) {
                return false; // not enough tokens
            }

            Cache::put($key . self::TOKENS_KEY, $refilled - $tokens, 3600);
            Cache::put($key . self::LAST_FILL_KEY, $now, 3600);

            return true;
        });
    }
}

// Use in job middleware:
class ThrottleWithTokenBucket implements JobMiddleware
{
    public function __construct(
        private readonly TokenBucketRateLimiter $limiter,
    ) {}

    public function handle(mixed $job, callable $next): void
    {
        if (! $this->limiter->consume()) {
            // Release back to queue to retry after token refill period
            $job->release(1); // retry in 1 second
            return;
        }
        $next($job);
    }
}

Handling 1M+ Jobs Per Day

One million jobs per day is approximately 11.6 jobs per second on average. That sounds manageable — until you consider that real-world load is not uniform. Marketing emails fire at 9am, batch syncs run at midnight, and user-driven events spike at lunch. Your system must handle 5–10x the average rate during peaks.

// Capacity planning math for 1M jobs/day at 10x peak:
//
// Average rate:    1,000,000 / 86,400 = 11.6 jobs/second
// Peak rate (10x): 11.6 * 10 = 116 jobs/second
//
// Per-worker throughput (Redis, mixed jobs, p50 duration 200ms):
//   1 job / 200ms = 5 jobs/second per worker
//
// Workers needed at peak: 116 / 5 = 23.2 → 25 workers (with headroom)
//
// Redis ops per job (pop, delete, update delayed set): ~6 ops
// Redis peak load: 116 * 6 = 696 ops/second → well within single Redis capacity
//
// Database connections at peak (25 workers, each holding 1 connection): 25
// Plus web tier: say 50 connections
// Total: 75 — comfortable for MySQL max_connections=200
//
// Memory per worker (Laravel + application): ~128 MB
// Total worker memory: 25 * 128 = 3.2 GB → fits on 8GB worker server with headroom

// Architecture recommendation for 1M+/day:
// - Redis backend (mandatory)
// - 2 worker servers (8 workers each = 16 baseline) + auto-scale to 40 during peaks
// - Horizon with auto-balance between queues
// - Separate Redis instance from application cache (isolate queue I/O)
// - Failed job monitoring with alert on >0.1% failure rate
// Optimizing job dispatch for bulk operations
// Dispatching 100,000 jobs one-by-one is slow — use pipeline for Redis batch insert

class BulkDispatcher
{
    public function dispatch(array $jobs): void
    {
        // Use Redis pipeline to batch-insert up to 1000 jobs at once
        $chunks = array_chunk($jobs, 1000);

        foreach ($chunks as $chunk) {
            Redis::pipeline(function ($pipe) use ($chunk) {
                foreach ($chunk as $job) {
                    $payload = $this->createPayload($job);
                    $pipe->rpush("queues:{$job->queue}", $payload);
                }
            });
        }
    }

    private function createPayload(object $job): string
    {
        return json_encode([
            'uuid'        => (string) Str::uuid(),
            'displayName' => get_class($job),
            'job'         => 'Illuminate\Queue\CallQueuedHandler@call',
            'maxTries'    => $job->tries ?? null,
            'timeout'     => $job->timeout ?? null,
            'data'        => [
                'commandName' => get_class($job),
                'command'     => serialize(clone $job),
            ],
        ]);
    }
}

// Dispatching 100,000 jobs this way:
// Individual dispatch: ~25 seconds (100,000 Redis round-trips)
// Pipeline dispatch:   ~0.8 seconds (100 pipeline batches of 1000)
// Monitoring queue health at scale with Prometheus metrics
// Exported via custom artisan command every 30 seconds

class QueueMetricsCommand extends Command
{
    protected $signature = 'metrics:queue-export';

    public function handle(): void
    {
        $queues = ['critical', 'notifications-push', 'notifications-email',
                   'video-transcode', 'stripe', 'analytics'];

        $metrics = [];
        foreach ($queues as $queue) {
            $metrics[] = sprintf(
                'laravel_queue_size{queue="%s"} %d %d',
                $queue,
                Redis::llen("queues:{$queue}") + Redis::zcard("queues:{$queue}:delayed"),
                now()->getTimestampMs()
            );
        }

        // Post to Prometheus Pushgateway or write to file for Node Exporter
        file_put_contents('/var/lib/prometheus/queue_metrics.prom',
            implode("\n", $metrics) . "\n"
        );
    }
}

Queue Topology Design – One Queue vs Many

The debate between a single default queue and many specialized queues is a real architectural decision with concrete tradeoffs. Here is the honest analysis:

  • One queue: simple to operate, no routing logic, every worker can process every job. Works well up to a few thousand jobs/day with uniform job types. Fails when a burst of slow jobs blocks fast ones.

  • Many queues: independent priority, independent scaling, independent failure domains. Required above moderate scale or when job types have significantly different SLAs. Adds operational complexity (more supervisors to configure, more metrics to track).

// Queue routing by job type — in each job class
class SendTransactionalEmailJob implements ShouldQueue
{
    public string $queue = 'notifications-email'; // declared on the class
}

// Or set at dispatch time:
ProcessVideoJob::dispatch($videoId)->onQueue('video-transcode');

// Queue routing by tenant (multi-tenant apps):
class TenantAwareJob implements ShouldQueue
{
    public string $queue;

    public function __construct(private readonly int $tenantId)
    {
        // High-value tenants get dedicated queue with more workers
        $tier        = Tenant::find($tenantId)?->tier ?? 'standard';
        $this->queue = match($tier) {
            'enterprise' => 'enterprise-processing',
            'pro'        => 'pro-processing',
            default      => 'standard-processing',
        };
    }
}

// When to add a new queue — the signals:
// 1. A specific job type is blocking other jobs (high-volume, slow jobs)
// 2. A specific job type needs SLA guarantees different from others
// 3. A specific job type uses a different external resource (separate rate limit pool)
// 4. A specific job type needs dedicated infrastructure (GPU server for ML jobs)
// 5. A specific tenant needs isolation (enterprise tier SLA)

Conclusion

High-load queue systems are won or lost at the architecture stage, not the code optimization stage. The decisions that matter most:

  • Topology first — separate queues for jobs with different SLAs, resource profiles, and failure characteristics. This is the foundational decision everything else builds on.

  • Fan-out for bulk operations — never put a million units of work in one job. A coordinator dispatches chunk jobs, which dispatch per-unit jobs if needed. Each level is independently retryable.

  • Worker count as rate control — for rate-limited APIs, the number of dedicated workers is your primary throttle. Don't over-provision them.

  • Bulk dispatch via pipeline — when dispatching tens of thousands of jobs programmatically, Redis pipeline mode reduces dispatch time by 30–50x.

  • Metrics as first-class infrastructure — at 1M+ jobs/day, you cannot operate blind. Queue depth per queue, p95 processing time, and failed job rate must be in your monitoring dashboard.

📑 On This Page