setName('task') ->addArgument('status', Argument::REQUIRED, 'start/stop/status') ->addOption('d', null, Option::VALUE_NONE, 'run as daemon') ->setDescription('start/stop/status async task executor'); } protected function init(Input $input, Output $output) { global $argv; $argv[1] = $input->getArgument('status')?:'start'; if ($input->hasOption('d')) { $argv[2] = '-d'; } else { unset($argv[2]); } } protected function execute(Input $input, Output $output) { $this->init($input, $output); Worker::$pidFile = app()->getRootPath() . 'task.pid'; $task = new Worker(); $task->count = 1; $task->onWorkerStart = [$this, 'start']; $task->onWorkerStop = [$this, 'stop']; $task->runAll(); } public function start() { $tube = Config::get('app.beanstalk_tube'); Beanstalk::watch($tube); while(true) { $job = Beanstalk::reserveWithTimeout(30); if (is_null($job)) { continue; } try { $str_payload = $job->getData(); $payload = json_decode($str_payload, true); switch($payload['cmd']) { case WechatNotify::CMD: WechatNotify::exec($payload); // TODO: NOT yet check return value break; default: Log::error('task unsupported cmd:' . $str_payload); } Beanstalk::delete($job); } catch (\Exception $e) { Log::error('task exception:' . $e->getMessage() . ' data:' . $job->getData()); Beanstalk::release($job, 1024, 30); } } // while } public function stop() { echo 'task stopped'; } }