Task.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. <?php
  2. namespace tw\async\tasks;
  3. use crmeb\utils\Beanstalk;
  4. use think\facade\Config;
  5. /**
  6. * 这个 Task 表示异步任务的基类, 借助于 beanstalkd 等服务来异步执行或延后执行
  7. *
  8. * HOWTO
  9. *
  10. * 首先根据特定的任务,实现一个子类, 比如 SomeTask
  11. *
  12. * class SomeTask extends Task {
  13. * //客户端调用,异步执行
  14. * public function push($param1, $param2, $param3) {
  15. * self::put([
  16. * 'param1' => $param1,
  17. * 'param2' => $param2,
  18. * 'param3' => $param3
  19. * ]);
  20. * }
  21. *
  22. * protected function getCmd() {
  23. * return 'some_task';
  24. * }
  25. *
  26. * protected function exec(array $param) {
  27. * //Do something
  28. * }
  29. * }
  30. *
  31. * // 消费任务的代码实现
  32. * // read from beanstalk
  33. * SomeTask::exec($param)
  34. *
  35. * 增加任务到 beanstalkd 代码
  36. */
  37. abstract class Task {
  38. /**
  39. * 虚函数。返回任务 cmd 标识, 任务类别
  40. *
  41. * @return string
  42. */
  43. abstract public function getCmd() : string;
  44. /**
  45. * 虚函数。执行从 beanstalkd 中取出的任务数据 $task.
  46. *
  47. * @return boolean
  48. */
  49. abstract protected function _exec(array $params);
  50. public function exec($task)
  51. {
  52. if (!$this->checkTask($task)) {
  53. return false;
  54. }
  55. return $this->_exec($task['params']);
  56. }
  57. /**
  58. * 任务自由的数据都叫 params
  59. * 这个函数打包成任务数据
  60. * @return array
  61. */
  62. protected function formatJob(array $params): array
  63. {
  64. return [
  65. 'cmd' => $this->getCmd(),
  66. 'ts' => time(),
  67. 'sender' => '0',
  68. 'params' => $params,
  69. ];
  70. }
  71. /**
  72. * 插入任务到 beanstalk
  73. *
  74. * @params 具体某个任务需要的参数
  75. * @return boolean
  76. */
  77. protected function put(array $params): bool
  78. {
  79. $bean = Beanstalk::instance();
  80. $tube = Config::get('app.beanstalk_tube', 'twong');
  81. $arr = $this->formatJob($params);
  82. $json = json_encode($arr);
  83. try {
  84. $bean::useTube($tube)->put($json);
  85. } catch (\Exception $e) {
  86. errlog('beanstalk put failed: ' . $json . ' error:' . $e->getMessage());
  87. return false;
  88. }
  89. return true;
  90. }
  91. /**
  92. * job 格式基本检查
  93. *
  94. * @return boolean
  95. */
  96. protected function checkTask($task)
  97. {
  98. if (!$task || !isset($task['cmd']) || $task['cmd'] != $this->getCmd()) {
  99. errlog('task format error: null or invalid cmd');
  100. return false;
  101. }
  102. if (!isset($task['params']) || !is_array($task['params'])) {
  103. errlog('task format error: without params or not an array.');
  104. return false;
  105. }
  106. return true;
  107. }
  108. }