| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- <?php
- namespace crmeb\command;
- use crmeb\services\async\task\WechatNotify;
- use crmeb\services\async\task\AsyncSms;
- use crmeb\services\async\task\AsyncClass;
- use think\console\Command;
- use think\console\Input;
- use think\console\input\Argument;
- use think\console\input\Option;
- use think\console\Output;
- use think\facade\Config;
- use think\facade\Log;
- use Workerman\Worker;
- use crmeb\utils\Beanstalk;
- /**
- * 用于执行异步任务,基于 beanstalkd
- * 用 redis 做异步任务的方式逐步废除
- *
- * tube: sms
- * {
- * "cmd": "sms",
- * "params": {
- * "phone": '',
- * "template": '',
- * "content": ''
- * }
- * }
- * tube: sql
- * {"static":true, "class":"UserModel", "func": "insert", "args":{}}
- *
- */
- class Task extends Command
- {
- protected function configure()
- {
- $this->setName('task')
- ->addArgument('status', Argument::REQUIRED, 'start/stop/status')
- ->addOption('d', null, Option::VALUE_NONE, 'run as daemon')
- ->setDescription('start/stop/status async task executor');
- }
- protected function init(Input $input, Output $output)
- {
- global $argv;
- $argv[1] = $input->getArgument('status')?:'start';
- if ($input->hasOption('d')) {
- $argv[2] = '-d';
- } else {
- unset($argv[2]);
- }
- }
- protected function execute(Input $input, Output $output)
- {
- $this->init($input, $output);
- Worker::$pidFile = app()->getRootPath() . 'task.pid';
- $task = new Worker();
- $task->count = 1;
- $task->onWorkerStart = [$this, 'start'];
- $task->onWorkerStop = [$this, 'stop'];
- $task->runAll();
- }
- public function start()
- {
- $tube = Config::get('app.beanstalk_tube');
- Beanstalk::watch($tube);
- while(true) {
- $job = Beanstalk::reserveWithTimeout(30);
- if (is_null($job)) {
- continue;
- }
-
- $executed = false;
- try {
- $str_payload = $job->getData();
- $payload = json_decode($str_payload, true);
- $asyncTasks = [
- new WechatNotify(),
- new AsyncClass(),
- new AsyncSms(),
- ];
- foreach($asyncTasks as $t) {
- if ($t->getCmd() == $payload['cmd']) {
- $retval = $t->exec($payload);
- if (!$retval) {
- Log::warning('async task executed return false:' . $str_payload);
- }
- Log::info("task: $str_payload return:". json_encode($retval));
- $executed = true;
- }
- }
- if (!$executed) {
- Log::error('task unsupported cmd:' . $str_payload);
- // Beanstalk::release($job, 1024, 30);
- Beanstalk::delete($job);
- continue;
- }
- Beanstalk::delete($job);
- } catch (\Exception $e) {
- Log::error('task exception:' . $e->getMessage() . ' data:' . $job->getData());
- // Beanstalk::release($job, 1024, 30);
- Beanstalk::delete($job); // testing
- }
- } // while
- }
- public function stop()
- {
- echo 'task stopped';
- }
- }
|