manager = $manager; $this->events = $events; } /** * Execute the console command. * * @return void */ public function handle() { $queues = $this->parseQueues($this->argument('queues')); $this->displaySizes($queues); $this->dispatchEvents($queues); } /** * Parse the queues into an array of the connections and queues. * * @param string $queues * @return \Illuminate\Support\Collection */ protected function parseQueues($queues) { return collect(explode(',', $queues))->map(function ($queue) { [$connection, $queue] = array_pad(explode(':', $queue, 2), 2, null); if (! isset($queue)) { $queue = $connection; $connection = $this->laravel['config']['queue.default']; } return [ 'connection' => $connection, 'queue' => $queue, 'size' => $size = $this->manager->connection($connection)->size($queue), 'status' => $size >= $this->option('max') ? 'ALERT' : 'OK', ]; }); } /** * Display the queue sizes in the console. * * @param \Illuminate\Support\Collection $queues * @return void */ protected function displaySizes(Collection $queues) { $this->newLine(); $this->components->twoColumnDetail('Queue name', 'Size / Status'); $queues->each(function ($queue) { $name = '['.$queue['connection'].'] '.$queue['queue']; $status = '['.$queue['size'].'] '.$queue['status']; $this->components->twoColumnDetail($name, $status); }); $this->newLine(); } /** * Fire the monitoring events. * * @param \Illuminate\Support\Collection $queues * @return void */ protected function dispatchEvents(Collection $queues) { foreach ($queues as $queue) { if ($queue['status'] == 'OK') { continue; } $this->events->dispatch( new QueueBusy( $queue['connection'], $queue['queue'], $queue['size'], ) ); } } }