Task.php 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. <?php
  2. namespace crmeb\command;
  3. use tw\async\tasks\WechatNotify;
  4. use tw\async\tasks\AsyncSms;
  5. use tw\async\tasks\AsyncClass;
  6. use think\console\Command;
  7. use think\console\Input;
  8. use think\console\input\Argument;
  9. use think\console\input\Option;
  10. use think\console\Output;
  11. use think\facade\Config;
  12. use Workerman\Worker;
  13. use crmeb\utils\Beanstalk;
  14. /**
  15. * 用于执行异步任务,基于 beanstalkd
  16. * 用 redis 做异步任务的方式逐步废除
  17. *
  18. * tube: sms
  19. * {
  20. * "cmd": "sms",
  21. * "params": {
  22. * "phone": '',
  23. * "template": '',
  24. * "content": ''
  25. * }
  26. * }
  27. * tube: sql
  28. * {"static":true, "class":"UserModel", "func": "insert", "args":{}}
  29. *
  30. */
  31. class Task extends Command
  32. {
  33. protected function configure()
  34. {
  35. $this->setName('task')
  36. ->addArgument('status', Argument::REQUIRED, 'start/stop/status')
  37. ->addOption('d', null, Option::VALUE_NONE, 'run as daemon')
  38. ->setDescription('start/stop/status async task executor');
  39. }
  40. protected function init(Input $input, Output $output)
  41. {
  42. global $argv;
  43. $argv[1] = $input->getArgument('status') ?: 'start';
  44. if ($input->hasOption('d')) {
  45. $argv[2] = '-d';
  46. } else {
  47. unset($argv[2]);
  48. }
  49. }
  50. protected function execute(Input $input, Output $output)
  51. {
  52. $this->init($input, $output);
  53. Worker::$pidFile = app()->getRootPath() . 'task.pid';
  54. $task = new Worker();
  55. $task->count = 1;
  56. $task->onWorkerStart = [$this, 'start'];
  57. $task->onWorkerStop = [$this, 'stop'];
  58. $task->runAll();
  59. }
  60. public function start()
  61. {
  62. $tube = Config::get('app.beanstalk_tube');
  63. Beanstalk::watch($tube);
  64. while (true) {
  65. $job = Beanstalk::reserveWithTimeout(30);
  66. if (is_null($job)) {
  67. continue;
  68. }
  69. $executed = false;
  70. try {
  71. $str_payload = $job->getData();
  72. $payload = json_decode($str_payload, true);
  73. $asyncTasks = [
  74. new WechatNotify(),
  75. new AsyncClass(),
  76. new AsyncSms(),
  77. ];
  78. foreach ($asyncTasks as $t) {
  79. if ($t->getCmd() == $payload['cmd']) {
  80. $retval = $t->exec($payload);
  81. if (!$retval) {
  82. warnlog('async task executed return false:' . $str_payload);
  83. }
  84. infolog("task: $str_payload return:" . json_encode($retval));
  85. $executed = true;
  86. }
  87. }
  88. if (!$executed) {
  89. errlog('task unsupported cmd:' . $str_payload);
  90. // Beanstalk::release($job, 1024, 30);
  91. Beanstalk::delete($job);
  92. continue;
  93. }
  94. Beanstalk::delete($job);
  95. } catch (\Exception $e) {
  96. errlog('task exception:' . $e->getMessage() . ' data:' . $job->getData());
  97. // Beanstalk::release($job, 1024, 30);
  98. Beanstalk::delete($job); // testing
  99. }
  100. } // while
  101. }
  102. public function stop()
  103. {
  104. echo 'task stopped';
  105. }
  106. }