Task.php 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. <?php
  2. namespace crmeb\command;
  3. use crmeb\services\async\task\WechatNotify;
  4. use crmeb\services\async\task\AsyncSms;
  5. use crmeb\services\async\task\AsyncClass;
  6. use think\console\Command;
  7. use think\console\Input;
  8. use think\console\input\Argument;
  9. use think\console\input\Option;
  10. use think\console\Output;
  11. use think\facade\Config;
  12. use think\facade\Log;
  13. use Workerman\Worker;
  14. use crmeb\utils\Beanstalk;
  15. /**
  16. * 用于执行异步任务,基于 beanstalkd
  17. * 用 redis 做异步任务的方式逐步废除
  18. *
  19. * tube: sms
  20. * {
  21. * "cmd": "sms",
  22. * "params": {
  23. * "phone": '',
  24. * "template": '',
  25. * "content": ''
  26. * }
  27. * }
  28. * tube: sql
  29. * {"static":true, "class":"UserModel", "func": "insert", "args":{}}
  30. *
  31. */
  32. class Task extends Command
  33. {
  34. protected function configure()
  35. {
  36. $this->setName('task')
  37. ->addArgument('status', Argument::REQUIRED, 'start/stop/status')
  38. ->addOption('d', null, Option::VALUE_NONE, 'run as daemon')
  39. ->setDescription('start/stop/status async task executor');
  40. }
  41. protected function init(Input $input, Output $output)
  42. {
  43. global $argv;
  44. $argv[1] = $input->getArgument('status')?:'start';
  45. if ($input->hasOption('d')) {
  46. $argv[2] = '-d';
  47. } else {
  48. unset($argv[2]);
  49. }
  50. }
  51. protected function execute(Input $input, Output $output)
  52. {
  53. $this->init($input, $output);
  54. Worker::$pidFile = app()->getRootPath() . 'task.pid';
  55. $task = new Worker();
  56. $task->count = 1;
  57. $task->onWorkerStart = [$this, 'start'];
  58. $task->onWorkerStop = [$this, 'stop'];
  59. $task->runAll();
  60. }
  61. public function start()
  62. {
  63. $tube = Config::get('app.beanstalk_tube');
  64. Beanstalk::watch($tube);
  65. while(true) {
  66. $job = Beanstalk::reserveWithTimeout(30);
  67. if (is_null($job)) {
  68. continue;
  69. }
  70. $executed = false;
  71. try {
  72. $str_payload = $job->getData();
  73. $payload = json_decode($str_payload, true);
  74. $asyncTasks = [
  75. new WechatNotify(),
  76. new AsyncClass(),
  77. new AsyncSms(),
  78. ];
  79. foreach($asyncTasks as $t) {
  80. if ($t->getCmd() == $payload['cmd']) {
  81. $retval = $t->exec($payload);
  82. if (!$retval) {
  83. Log::warning('async task executed return false:' . $str_payload);
  84. }
  85. Log::info("task: $str_payload return:". json_encode($retval));
  86. $executed = true;
  87. }
  88. }
  89. if (!$executed) {
  90. Log::error('task unsupported cmd:' . $str_payload);
  91. // Beanstalk::release($job, 1024, 30);
  92. Beanstalk::delete($job);
  93. continue;
  94. }
  95. Beanstalk::delete($job);
  96. } catch (\Exception $e) {
  97. Log::error('task exception:' . $e->getMessage() . ' data:' . $job->getData());
  98. // Beanstalk::release($job, 1024, 30);
  99. Beanstalk::delete($job); // testing
  100. }
  101. } // while
  102. }
  103. public function stop()
  104. {
  105. echo 'task stopped';
  106. }
  107. }