setName('task') ->addArgument('status', Argument::REQUIRED, 'start/stop/status') ->addOption('d', null, Option::VALUE_NONE, 'run as daemon') ->setDescription('start/stop/status async task executor'); } protected function init(Input $input, Output $output) { global $argv; $argv[1] = $input->getArgument('status') ?: 'start'; if ($input->hasOption('d')) { $argv[2] = '-d'; } else { unset($argv[2]); } } protected function execute(Input $input, Output $output) { $this->init($input, $output); Worker::$pidFile = app()->getRootPath() . 'task.pid'; $task = new Worker(); $task->count = 1; $task->onWorkerStart = [$this, 'start']; $task->onWorkerStop = [$this, 'stop']; $task->runAll(); } public function start() { $tube = Config::get('app.beanstalk_tube'); Beanstalk::watch($tube); while (true) { $job = Beanstalk::reserveWithTimeout(30); if (is_null($job)) { continue; } $executed = false; try { $str_payload = $job->getData(); $payload = json_decode($str_payload, true); $asyncTasks = [ new WechatNotify(), new AsyncClass(), new AsyncSms(), ]; foreach ($asyncTasks as $t) { if ($t->getCmd() == $payload['cmd']) { $retval = $t->exec($payload); if (!$retval) { warnlog('async task executed return false:' . $str_payload); } infolog("task: $str_payload return:" . json_encode($retval)); $executed = true; } } if (!$executed) { errlog('task unsupported cmd:' . $str_payload); // Beanstalk::release($job, 1024, 30); Beanstalk::delete($job); continue; } Beanstalk::delete($job); } catch (\Exception $e) { errlog('task exception:' . $e->getMessage() . ' data:' . $job->getData()); // Beanstalk::release($job, 1024, 30); Beanstalk::delete($job); // testing } } // while } public function stop() { echo 'task stopped'; } }