Task.php 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. <?php
  2. namespace crmeb\command;
  3. use crmeb\services\async\task\WechatNotify;
  4. use think\console\Command;
  5. use think\console\Input;
  6. use think\console\input\Argument;
  7. use think\console\input\Option;
  8. use think\console\Output;
  9. use think\facade\Config;
  10. use think\facade\Log;
  11. use Workerman\Worker;
  12. use crmeb\utils\Beanstalk;
  13. /**
  14. * 用于执行异步任务,基于 beanstalkd
  15. * 用 redis 做异步任务的方式逐步废除
  16. *
  17. * tube: sms
  18. * {
  19. * "cmd": "sms",
  20. * "params": {
  21. * "phone": '',
  22. * "template": '',
  23. * "content": ''
  24. * }
  25. * }
  26. * tube: sql
  27. * {"static":true, "class":"UserModel", "func": "insert", "args":{}}
  28. *
  29. */
  30. class Task extends Command
  31. {
  32. protected function configure()
  33. {
  34. $this->setName('task')
  35. ->addArgument('status', Argument::REQUIRED, 'start/stop/status')
  36. ->addOption('d', null, Option::VALUE_NONE, 'run as daemon')
  37. ->setDescription('start/stop/status async task executor');
  38. }
  39. protected function init(Input $input, Output $output)
  40. {
  41. global $argv;
  42. $argv[1] = $input->getArgument('status')?:'start';
  43. if ($input->hasOption('d')) {
  44. $argv[2] = '-d';
  45. } else {
  46. unset($argv[2]);
  47. }
  48. }
  49. protected function execute(Input $input, Output $output)
  50. {
  51. $this->init($input, $output);
  52. Worker::$pidFile = app()->getRootPath() . 'task.pid';
  53. $task = new Worker();
  54. $task->count = 1;
  55. $task->onWorkerStart = [$this, 'start'];
  56. $task->onWorkerStop = [$this, 'stop'];
  57. $task->runAll();
  58. }
  59. public function start()
  60. {
  61. $tube = Config::get('app.beanstalk_tube');
  62. Beanstalk::watch($tube);
  63. while(true) {
  64. $job = Beanstalk::reserveWithTimeout(30);
  65. if (is_null($job)) {
  66. continue;
  67. }
  68. try {
  69. $str_payload = $job->getData();
  70. $payload = json_decode($str_payload, true);
  71. switch($payload['cmd']) {
  72. case WechatNotify::CMD:
  73. WechatNotify::exec($payload); // TODO: NOT yet check return value
  74. break;
  75. default:
  76. Log::error('task unsupported cmd:' . $str_payload);
  77. }
  78. Beanstalk::delete($job);
  79. } catch (\Exception $e) {
  80. Log::error('task exception:' . $e->getMessage() . ' data:' . $job->getData());
  81. Beanstalk::release($job, 1024, 30);
  82. }
  83. } // while
  84. }
  85. public function stop()
  86. {
  87. echo 'task stopped';
  88. }
  89. }